HDFS特性之RaidNode与EC

众所周知, RAID(磁盘阵列) 是广泛运用在服务器端的一种提升存储性能的技术, 在HDFS中的高版本(HDFS3)中也有类似的重要特性 EC

(纠错码技术), 但是高版的EC是从内部设计的, 对HDFS的存储结构, 读写影响都很大, 无法兼容旧数据, 所以日后再单独体系的讲

先来看看早在HDFS 低版本 (0.2)时期已经出现了外挂RAID降低存储成本的方案: 也就是今天的主人公”RaidNode”, 可以说它是EC的前身.

0x00. 背景

HDFS中有个核心的点就是它依靠默认的 3副本 来保证数据可靠性, 换言之就是说在HDFS上存储 1份数据 要占 3份 的实际空间, 这对磁盘存储的消耗是很大的, 特别是放在SSD这种比较昂贵的存储介质上. 那么很自然的, 大家想到将磁盘RAID的技术运用到HDFS上, 以达到降低实际存储占用的效果, 如下图所示:

最早在2009年, Facebook 就开源了自己的 实现 , 并很快贡献给了HDFS社区 引入 了HDFS-0.21的版本中(后续被移除), 一般我们现在提到的各种版本RaidNode都是基于Facebook这一版做了改动而来的, 所以主要是学习它的设计和理念. (其中EC相关的概念可参考 此篇 或官方的 这篇文章 , 或者 国内 版)

那么Facebook-Raidnode的工作原理是什么呢? 简单说它有两种校验模式, 一种是XOR(类似RAID5), 另一种是RS(里所码), 一般采用的是 RS模式 , 二者对比图如下:

RS模式, 是指通过对 N个 数据块进行 运算 . 算出 M个 校验块, 最多可以容忍 M个 块的丢失. 丢失的块可通过现存的N个块重新算出恢复, 这种方式存储成本最低, 但计算和恢复成本较大.

然后我们遵循 Facebook 官方的定义, 把数据块称作” source data “, 把校验块又称作” parity(奇偶) data “, 采用它推荐的 “10+4” 的组合, 参见下图:

它采用的 RS (Reed-Solomon)-EC编码方式原理可以先放一下, 最直观的理解就是, 以前10个block, 我需要耗费 3 倍的存储空间, 而现在我只需要 1.4倍 的空间, 可以说节省了 100% 的存储占用, 而且这总共14个block中, 可以容忍最多4个块的丢失, 还能从剩余的块把它们计算恢复, 优点是显而易见的.

听起来很神奇, 但随之就有一系列的疑问, 比如:

  • 这是如何做到的, 使用场景是什么?
  • 原有的块副本去哪了? 是直接把副本数设置为1了么?
  • Raidnode又是如何使用外挂的结构呢?
  • 使用Raidnode方案, 有什么代价, 目前又有哪些不足?

下面会一一说明, 先来看看Raidnode的整体架构.

PS:Raidnode大部分参考资料质量都不太好, 官方 Wiki 也被移除了, 已有的里面推荐两个 (若访问失败可以通过 Google快照 )

  1. Facebook官方的 Slide (EC图片引用)
  2. 比较正式的结构和细节 Slide

0x01. 整体架构

首先, Raidnode整体也是 C/S 结构的设计, 有一个Server也就是Raidnode进程, 然后可由Client端发起请求触发操作, server也定期自动触发操作, 如下所示:

  1. Server端
    • RaidNode
    • 块替换策略
  2. Client端
    • DistributedRaidFileSystem
    • Raid Shell

整体如下图所示:

不过这里需要注意的是, 实际生产环境里, 为了简化RaidNode使用, 可能会弃用它的Client, 只需要RaidNode自行定期检查–>工作.

RaidNode涉及与NameNode、DataNode、Corona/Yarn集群通信

  • 其中与NameNode通信的主要目的是 获取 需要归档的文件、获取待修复文件、获取降幅本文件以及删除过期的校验数据等
  • 与DataNode通信是为了 避免 多个raid相关的block同时落在同一个DataNode中 (这个是直接发RPC给DN?)
  • 与Corona/Yarn集群通信则是为了提交任务进行生成校验块, 修复丢失块,归档等 (降副本实际是NN做的)

0x02. 具体结构

A. RaidNode

1. 功能

那么可以推测出, 之所以低版本的Raid实现是比较独立的, 就是因为主要的组件是单独的一个进程, 那么先来看看核心进程RaidNode做哪些事:

  1. 定时读取配置文件, 得知哪些目录需要被Raid
  2. 定时的与NN通信, 对配置Raid的目录扫描, 然后主要做以下3个事:
    • 生成校验文件 (MR)
    • 降低数据文件副本
    • 定时清理过期的校验文件 (比如数据文件被删后)
  3. 提交归档(合并成HAR)任务, 以减少校验文件的INode占用

2. 模块

RaidNode主要由几个重要线程类组成:

  1. ConfigManager:定时加载由 raid.config.file 配置
  2. TriggerMonitor (主要):定期检查配置的raid目录,然后提交MR作业, 是主要模块
  3. BlockIntegrityMonitor:定时检查raid后文件的完整性,有异常则提交修块的MR任务
  4. PurgeMonitor:定时检查是否有源文件NN中被删除了, 有则将其对应校验文件删除
  5. PlacementMonitor: 定时检查, 确保raid后的块尽可能分散在不同的DataNode上 (DN间移动block)
  6. HarMonitor: 定时扫描 校验文件 , 提交归档任务以节省NN的INode占用.

其中 1~6 个组件都对应RaidNode中的不同 后台线程 , 在RaidNode进程启动后就会依次初始化它们, 以实现在定期检测工作的效果.

PS:除此之外, 还有StatsCollectorThread和HTTP Server模块, 但是并非重点, 暂时略过, 同时原版中3的名字似乎是 BlockFixer , 后续调整了, 说明一下

B. JobTracker

这里说的 JobTracker 不是一个Raidnode中具体的类, 它是往调度平台提交 Map-Reduce 任务的任务调度器, 简单理解 即可, 我们关注以下几个问题:

  1. 为什么任务要以MR方式提交?
  2. 如何提交并执行的? (关键)
  3. MR任务执行成功/失败后续?

C. DRFS & RaidShell

DRFS和RaidShell的关系差不多就跟我们之前DFS和FShell关系一样, Shell提供了一层封装方便使用, 本质是一体的, 但是因为这部分不一定必要, 所以生产环境中你可能会发现Raidnode-Client部分被 弃用 了, 也没多少影响, 就是说不能用户自己去提交命令给 Raidnode 指定修复某个块了.

1. DRFS

DRFS(DistributedRaidFileSystem)其实就是在客户端Raid的具体FileSystem实现, 它主要功能是:

  1. 允许客户端读”坏的数据块”
    • 可捕获 BlockMissingChecksumError 的异常
    • 可重新生成缺失的块
  2. 但读数据时不会立刻去修复丢失的块, 仅仅是让读操作成功完成. (TODO)

2. RaidShell

一个快捷的命令行工具, 主要功能是:

  1. 发送块恢复指令
    • 重建丢失的块
    • 将重建的块发给Datanode
  2. Raid的FSCK命令
  3. 管理员相关命令

0x03. 部署使用

了解了前面的大体铺垫后, 先来具体用一下, 看看它到底是怎么运作的, 熟悉环境 (记得 提前准备 好HDFS+Yarn的环境, 最好是3个至少节点)

1. 配置相关

首先是配置相关, 它确定了当前的RAID策略, 以及具体的校验文件存放等, 先在 hdfs-site.xml 添加配置项(其他可 参考 )

<configuration>
<property>
    <name>raid.config.file</name>
    <value>/path/to/conf/raid.xml</value>
</property>
<property>
    <name>raid.policy.rescan.interval</name>
    <value>10000</value>
</property>
<property>
    <name>raid.classname</name>
    <value>org.apache.hadoop.raid.DistRaidNode</value-->
</property>
<property>
    <name>raid.blockfix.classname</name>
    <value>org.apache.hadoop.raid.DistBlockIntegrityMonitor</value>
</property>
<property>
    <name>hdfs.raidrs.locations</name>
    <value>/archive-raid/meta/rs</value>
</property>
<property>
    <name>hdfs.raid.stripeLength</name>
    <value>10</value>
</property>
<property>
    <name>hdfs.raidrs.paritylength</name>
    <value>4</value>
</property>
<property>
    <name>hdfs.raid.block.move.simulate</name>
    <value>false</value>
</property>
<property>
    <name>raid.distraid.synmaxfile</name>
    <value>0</value>
</property>
<property>
    <name>raid.file.permap</name>
    <value>1</value>
</property>

</configuration>

然后就是配置 raid.xml , 举个具体例子 (多个目录需要配多个policy项)

<configuration>
  <policy name = "test-raid">
    <srcPath prefix="hdfs://namenodeIP:port/archive"></srcPath>
         <erasureCode>rs</erasureCode>
      
         <property>
          <name>srcReplication</name>
          <value>2</value>
          <description>
          pick files for RAID only if their replication factor is greater than or equal to this value.
          </description>
        </property>
      
        <property>
          <name>targetReplication</name>
          <value>1</value>
          <description>
          after RAIDing, decrease the replication factor of a file to this value.
          </description>
        </property>
      
        <property>
          <name>metaReplication</name>
          <value>1</value>
          <description> the replication factor of the RAID meta file</description>
        </property>

        <property>
          <name>modTimePeriod</name>
          <value>1800000</value>
          <description> 一个文件多长时间未被修改才会RAID--选冷数据 </description>
        </property>

       <property>
          <name>stripeLength</name>
          <value>10</value>
       </property>
  </policy>
<configuration>

可以看到它就是配置了几个关键的参数项, 注释也写的很清楚, 从而确定了Raidnode运行的具体模式

2. 启动运行

这里有两种方式可以启动, 一种是传统(原版0.20)方式通过自带的脚本, 然后你可以用raid-shell去操作, 还有一种是升级后高版中去掉了raid-shell, 直接通过 hadoop jar raid.jar xxx.RaidNode 的方式启动, 都说一下.

A. 使用默认脚本启动

关键的当然就是启动/停止RaidNode进程, 但是要注意Raidnode可能还会间接启动一个单独的 Archive 进程

# hadoop根目录下, 注意此脚本仅限原版0.2X才有
bin/start-raidnode.sh  # stop对应停止

# 启动后jps观察应该有
*.RaidNode

B. 使用直接提交Jar包方式启动

这个方式适用于改进版本, 以及高版适配后不能使用脚本启动的方式,

# 先自己编译raidnode修改后的项目打个jar包
# 在配置好的hadoop包下直接jar方式启动
bin/hadoop jar /tmp/raid.jar xx.xx.xx.raidnode.RaidNode

# 启动后只显示Jar进程
*.RunJar

C. 检验结果

那如何判raidnode运行正常了呢? 有Client端/Raidshell, 最简单的方法就是用 raidshell 测试一下, 此外还可以有两点通用的检测方式:

  1. 观察你在hdfs-site中配置的 hdfs.raidrs.locations 对应的路径是否有生成文件

    # 这里具体的raid路径,和raid后路径以实际为准
    bin/hadoop fs -ls -R /archive* /tmp /raid
    
    # 正在raid时候, 应该会在/tmp下生成临时校验文件, 完成后移动到/archive-raid中
    # 假设我们/archive/raid目录下有两个大文件1.big和2.big, 那么执行raid时
    /tmp/raidrs/1.big-3308087870674951347
    /tmp/raidrs/2.big8835042559380365685
    
    # 执行完后, 应该存在
    /archive-raid/meta/rs/archive/raid/1.big
    /archive-raid/meta/rs/archive/raid/2.big
    
  2. 你配置需要降副本的目录, fsck查看副本数是否降为1了. 当然其实高版本 HDFS-shell 使用 ls 时就会在第二列显示副本数, 更加直观.

0x04. 代码实现

代码主要就说Server端的 Raidnode 了, Client的实现暂略. 由于Raidnode的本地实现基本不具有实际可用性, 所以先只看分布式的 DistRaidNode 实现

1.ConfigManager

它只做一件事, 就是定时判断是否重新 加载配置文件 , 线程的 run 方法的如下:

public void run() {  
    while (running) {  
        try { // 检测间隔, 默认10s
            Thread.sleep(reloadInterval); 
            reloadConfigsIfNecessary();  
        } catch (Exception e) {
            LOG.error("Failed to reload config file", e); 
}}}

// 判断文件是否修改过, 修改过就重新加载
public synchronized boolean reloadConfigsIfNecessary() { 
  long time = HighTideNode.now(); 
  if (time > lastReloadAttempt + reloadInterval) { 
  lastReloadAttempt = time;  
  File file = null;   
  try {      
    file = new File(configFileName); 
    long lastModified = file.lastModified(); 
    if (lastModified > lastSuccessfulReload &&
            time > lastModified + RELOAD_WAIT) {  
    reloadConfigs(); 
    lastSuccessfulReload = time;   
    lastReloadAttemptFailed = false;  
    return true;    
       }}catch (Exception e) {
    lastReloadAttemptFailed = true;
       }}  
  return false;
}

2.TriggerMonitor实现

上面reload配置文件后, 具体做事的线程是TriggerMonitor, 它会定期扫描配置目录, 并调用 DistRaid 以MR方式提交raid任务:

Trigger线程主要做的如上图左侧所示, 它扫描目录和读配置的地方不是重点, 关键就 两点 :

  • 初始化任务
  • 执行任务

下面先看看整体的代码结构, 然后后续围绕这两点再细说:

 // TriggerMonitor循环调用doProcess()方法
 private void doProcess() {
     ArrayList<PolicyInfo> allPolicies = new ArrayList<>();   
 for (PolicyInfo info : configMgr.getAllPolicies()) allPolicies.add(info);
   
     while (running) {
       Thread.sleep(10000); // 10s
       // 1. 是否需要重新加载配置
       boolean reloaded = configMgr.reloadConfigsIfNecessary();
       if (reloaded) clearAndAddPolicies(configMgr.getAllPolicies());
 
       
       // 2. 遍历raid.xml文件中的所有目录配置
   for (PolicyInfo info: allPolicies) {
       skipAlreadyScanedInfo(info.getName());
       if (!shouldSelectFiles(info)) continue;
           
       List<FileStatus> filteredPaths = null;  
       try { // 2.1 扫描得到待raid的文件列表
           filteredPaths = selectFiles(info, allPolicies);
       } catch (Exception e) {continue;}
       if (filteredPaths == null || filteredPaths.size() == 0) continue;
       
       try { // 2.2 准备任务和提交任务(核心)
           raidFiles(info, filteredPaths);
        } catch (Exception e) {continue;}
      }
} // end while
 }

 // DistRaidNode调用raidFiles(), 实际流程都在DistRaid类中(之后细说)
 @Override
 void raidFiles(PolicyInfo info, List<FileStatus> paths) throws IOException {
   DistRaid dr = new DistRaid(conf);
   dr.addRaidPaths(info, paths);
   // 准备job & 启动job都在此
   boolean started = dr.startDistRaid();
   if (started) {
     jobMonitor.monitorJob(info.getName(), dr);
   }
 }

 public boolean startDistRaid() throws IOException {
   if (setup()) { // ← 准备Job
     this.jobClient = new JobClient(jobconf);
     this.runningJob = this.jobClient.submitJob(jobconf);
     LOG.info("Job Started: " + runningJob.getID());
     return true;
   }
   return false;
 }

上面 selectFiles() 的实质是调用 RaidState.check() 来根据多个条件筛选待raid文件, 因为不是重点, 暂不细说, DistRaid的具体调用下面单独说.

备注:在Facebook原版代码中, 准备任务前还有个切分文件并封装为 EncodingCandidate 的操作, 详见 源码 , 但是后续改良的版本里把这部分去掉了, 原因暂不确定, 所以引的图片我也做了对应调整.

3. DistRaid (核心)

DistRaid简单来说是一个封装了MR作业具体操作的类, 基本所有作业相关都是在这里定义和实现的, 它有三个比较关键的点:

setup()
DistRaidInputFormat
DistRaidMapper

1. 预处理文件

预处理做除了一些基本job参数设置, 主要做的是, 把一个目录下待raid的 多个 文件, 转换为 一个 SequenceFile 格式的特殊文件, 类似的做法也在 distcp 实现中采用了, 不过我们这里待raid的文件都是大文件, 合并它们应该不是存储占用上的考虑, 且它也没使用压缩, 具体是什么原因还不太清楚..

private boolean setup() throws IOException {
    estimateSavings();
    // 1. 给每个任务一个不同的地址
    final String randomId = getRandomId();
    JobClient jClient = new JobClient(jobconf);
    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);

    jobconf.set(JOB_DIR_LABEL, jobdir.toString());
    Path log = new Path(jobdir, "_logs");

    // 控制文件的块大小应该小于普通文件, 目的是让map的负载更均匀(?)
    jobconf.setInt("dfs.blocks.size",  OP_LIST_BLOCK_SIZE); // 32MB
    FileOutputFormat.setOutputPath(jobconf, log);

    // 生成操作list
    FileSystem fs = jobdir.getFileSystem(jobconf);
    Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
    jobconf.set(OP_LIST_LABEL, opList.toString());
    int opCount = 0, synCount = 0;
    SequenceFile.Writer opWriter = null; // writer负责生成seq文件

    try {
      opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
                                           PolicyInfo.class, CompressionType.NONE);
      for (RaidPolicyPathPair p : raidPolicyPathPairList) {
        // 如果目录第一次做raid时就有大量的文件, 那么这些文件很可能也有类似的大小
        // 此时, 先随机打混一次会使文件分散更均匀
        java.util.Collections.shuffle(p.srcPaths);
        for (FileStatus st : p.srcPaths) { // 依次写入乱序后的文件
          opWriter.append(new Text(st.getPath().toString()), p.policy);
          opCount++;
          if (++synCount > SYNC_FILE_MAX) { // MAX默认值10, 多调整为0(见QA)
            opWriter.sync(); // 写一个SYNC标记(4+16=20字节)
            synCount = 0;
          }
          if (opCount >= MAXFILE_PER_TIME) { // 一次raid最多写多少文件, 默认10
        	  break; 
          }
        }
      }

    } finally {
      if (opWriter != null) opWriter.close();
      fs.setReplication(opList, OP_LIST_REPLICATION); // 控制文件增到10副本. why?
    }
    
    raidPolicyPathPairList.clear();
    jobconf.setInt(OP_COUNT_LABEL, opCount); // opCount=文件数
    // 根据文件数和配置比例, 设置当前作业的map数
    jobconf.setNumMapTasks(getMapCount(opCount,
                                       new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
    if (jobpool != null) jobconf.set("mapred.fairscheduler.pool", jobpool); 
    LOG.info("jobName={}, numMapTasks={} jobpool={}"...);
    
    return opCount != 0;
  }

这里只看代码会显得过于抽象, 也不理解为什么实际应用里会把 SYNC_FILE_MAX 改为0(每写一个文件就写一个 SYNC 标记), 其实这是一个原版的BUG, 会导致严重的map读取倾斜, 如下图所示: (基于原图修改)

解释一下上图的意思: 理想情况下每个map读取的文件应尽可能均匀, 但由于采用了SequenceFile的格式设计, 它对应使用的读取器 SequenceFileRecordReader 是按 SYNC 标志为每次读取”开始-结束”的, 所以当设置为 10个文件 写一个SYNC标记, 就可能造成 map1 读取了 两个SYNC 内10个文件, 而其他的map读不到数据的情况, 也就是图中间的例子. 图最右部分就是改成每写入一个文件写一个SYNC标记. (这里还有疑问, 包括seq文件为啥没有看到? seq文件一定合并为1个么?)

2. DistRaidInputFormat实现

类似 distcp 的MR实现一样, DistRaid也是 只有map阶段 , 没有reduce阶段的, map的实现就是 DistRaidMapper , 之后再看,

简单说InputFormat接口规定了MR阶段输入文件的规范, 实现类需要通过两个方法来确定 读什么 , 怎么读 :

  • getSplits (): 按给定规则 切分 源文件, 切成一条条split数组 (就好比把一张纸切成N条, 每条有M个记录)
  • getRecordReader (): 对split后的数据条 再拆分 , 转为map具体读取的一条条K-V数据, 核心 next() 方法

然后Client提交任务后, JobTracker会把数据条分给对应的mapper处理, 下面来看看 getSplits() 的实现: (代码重构过, 原始版本 参考 )

/* 切分核心关注两点:
 * 1. 怎么切: 主要通过SeqFile内置的reader.next(), 通过SYNC标记来确定切分的数据点
 * 2. 切多少: 这取决于具体计算的细节, 需要再确认一下. (numSplits)
 */
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) {
  final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
  final int targetcount = srcCount / numSplits;

  Path srcs = new Path(job.get(OP_LIST_LABEL, ""));
  FileSystem fs = srcs.getFileSystem(job);
  List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

  SequenceFile.Reader in = null;
  Text key = new Text();
  PolicyInfo value = new PolicyInfo();
  long prev = 0L;
  int count = 0;
  
  // 新增代码段
  int[] numPerSplit = new int[numSplits];
  for (int i = 0; i < numSplits; i++) numPerSplit[i] = targetcount;
  for (int i = 0; i < (srcCount - numSplits * targetcount); i++) numPerSplit[i]++;
 
  // (新增)引入一个临时的map容器, key是block的DN_IP, value是block数量值
  HashMap<String, Integer> map = null; 
  try { // 这里本质是循环调用SeqFile的next()切分SeqFile文件, 涉及到SYNC的标记判断
    for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
      // 1. 获取数据源文件对应的block信息
      BlockLocation[] locations = new BlockLocation[0];
      if (fs.exists(new Path(key.toString()))){
        locations = fs.getFileBlockLocations(params...);
      }
      if(map == null) map = new HashMap<String, Integer>();

      // 2. 依次遍历文件的所有块, 记录每个DN的raid块数
      for(int i = 0; i < locations.length; i++){
    	  String[] locs = locations[i].getHosts();
          // 把每个块对应的3个DN-IP, 和此IP对应有几个block数记在map中
    	  for(int j = 0; j < locs.length; j++){
    		  if(map.containsKey(locs[j])){
    			  int num = map.get(locs[j]);
    			  map.put(locs[j], ++num);
    		  }else{
    			  map.put(locs[j], 1);
    		  }
    	  }
      }
      
      long curr = in.getPosition();
      long delta = curr - prev;
      // 3.达到阈值后切分, 先从刚记录的DN-IP中, 选出存储最多block数的一个
      if (++count >= numPerSplit[splits.size()]) {
        count = 0;
        int max = 0;
        String bestLoc = null;
        Iterator<Entry<String, Integer>> iter = map.entrySet().iterator();
        while(iter.hasNext()){
      	  Entry<String, Integer> entry= iter.next();
      	  if(max < entry.getValue()){
      		  max = entry.getValue();
      		  bestLoc = entry.getKey();
      	  }
        }
        String[] locs = null;
        if( bestLoc != null){	  
          locs = new String[1];
          locs[0] = bestLoc;
        }
        // 将存储最多block的DN-IP作为host信息, 记在切分的数据条中(意义是?)
        splits.add(new FileSplit(srcs, prev, delta, locs));
        prev = curr;
        map.clear();
      } // end-count-if

    } // end-for-loop
  } finally {
    in.close();
  }
  return splits.toArray(new FileSplit[splits.size()]);
}

刚才的 getSplits() 先把 SeqFile 从一个大文件切成若干个数据条( FileSplit 对象), 切分方式类似于:

split1: SYNC1 --> file1 --> file2 ...-->file10 (起始位置A)

split2: SYNC2 --> file11 --> file12 ...-->file20 (起始位置A+B)

split3…..同上直至写完

接下来把每个数据条传入 SequenceFileRecordReader 的读取具体 K-V 对的方法中,这样map才能直接读取一个个 K-V 对. 我们需要 溯源 一下底层实现:

  @Override
  public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
                                                        JobConf job, Reporter reporter) {
    // 从这可以看到SequenceFile的K-V的结构, key是文件名, value是数据和元信息
    return new SequenceFileRecordReader<Text, PolicyInfo>(job, (FileSplit) split);
  }

// SeqFile是默认就有具体实现类的, 这部分其实是MR体系的内容, 也一并学习一下
// 疑问是看起来这里并没有读取具体的K-V信息, 只是读了SYNC做了验证, 那在哪读的具体K-V呢?
// getRecorded到底是被谁调用, 怎么调用, 多少次之类的...(待确定)
public SequenceFileRecordReader(Configuration conf, FileSplit split) {
  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);
  // 初始化reader
  this.in = new SequenceFile.Reader(fs, path, conf);
  this.end = split.getStart() + split.getLength();
  this.conf = conf;

  // 在getSplits中, 每个split都会有一个相对于整个SeqFile的位置(start)
  // 如果当前split起始位置比当前reader的位置大, 那么跳到下一个SYNC标记的位置(见后)
  if (split.getStart() > in.getPosition()) in.sync(split.getStart());      

  this.start = in.getPosition(); // 更新起始位置
  more = start < end;
}

  public synchronized void sync(long position) throws IOException {
    // 读到SeqFile尾部了, 跳到尾部结束
    if (position + SYNC_SIZE >= end) { 
      seek(end);
      return;
    }

    // 读SeqFile第一条数据时, 单独判断
    if (position < headerEnd) {
      in.seek(headerEnd);
      syncSeen = true;
      return;
    }
    // 普通情况
    try { 
      // 1. 先读取SYNC标记
      seek(position + 4); // 先跳4字节SYNC前缀
      in.readFully(syncCheck); // 读16字节的bytes作为对比数据
      int syncLen = sync.length; // 16
      // 下面看起来是做校验, syncCheck似乎是确认SYNC完整性的
      for (int i = 0; in.getPos() < end; i++) {
        int j = 0;
        for (; j < syncLen; j++) {
          if (sync[j] != syncCheck[(i+j) % syncLen]) break;
        }
        if (j == syncLen) {
          in.seek(in.getPos() - SYNC_SIZE); // 跳回20字节前(读SYNC标记前)
          return;
        }
        // 更新syncCheck值? 没明白
        syncCheck[i % syncLen] = in.readByte();
      }
    } catch (ChecksumException e) {
      handleChecksumException(e);
    }
  }

这里简单看 SequenceFileRecordReader 的构造方法还不是很清楚如何读的每一个K-V, 其中V在 raidnode 中应该就是一个个的文件.

未完待续

3. DistRaidMapper实现

这是map的具体实现, 因为raid的过程只执行map阶段, 所以它的实现等于就是raid的MR实现, 是 怎么做 的关键, 也就是如何 生成校验文件 .

  1. 获取目前raid所需的所有信息
  2. 对文件进行一系列检查, 确认满足raid条件
  3. 生成校验(parity)文件

下面是map的实现源码, 核心其实就是去调用 doRaid() 方法, 然后统计一下执行结果.

  @Override
  public void map(Text key, PolicyInfo policy,
                  OutputCollector<WritableComparable, Text> out,
                  Reporter reporter){
    this.reporter = reporter;
    try {
      Path p = new Path(key.toString());
      FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
      st.clear();
      RaidNode.doRaid(jobconf, policy, fs, st, reporter); // 核心调用doRaid方法

      ++succeedcount;

      reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
      reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
      reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
      reporter.incrCounter(Counter.META_SIZE, st.metaSize);

      reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
    } catch (IOException e) {
      ++failcount;
      reporter.incrCounter(Counter.FILES_FAILED, 1);

      String s = "FAIL: "+policy+key+StringUtils.stringifyException(e);
      out.collect(null, new Text(s));
    } finally {
      reporter.setStatus(getCountString());
    }
  }

// 看看关键的doRaid实现 (本地模式和MR共用此逻辑)
public static boolean doRaid(params...) {
  Path p = stat.getPath();
  FileSystem srcFs = p.getFileSystem(conf);

  // 1. 从HDFS找到对应文件的block位置
  BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
  
  // 再次检查文件块数
  if (locations.length < 3) return false;

  // 计算文件占用总的磁盘空间(多个副本)
  long diskSpace = 0;
  for (BlockLocation l: locations) {
    diskSpace += (l.getLength() * stat.getReplication());
  }
  statistics.numProcessedBlocks += locations.length;
  statistics.processedSize += diskSpace;

  // 2. 生成校验文件(细节比较多) 见图
  generateParityFile(params...);
    
  if (!doSimulate) {
    if (srcFs.setReplication(p, (short)targetRepl) == false) { // 再次设置为1副本
      LOG.info("Error in reducing replication of " + p + " to " + targetRepl);
      statistics.remainingSize += diskSpace;
      return false;
    };
  }

  diskSpace = 0;
  for (BlockLocation l: locations) {
    diskSpace += (l.getLength() * targetRepl);
  }
  statistics.remainingSize += diskSpace;

  // meta文件的块数取决于文件自身块数(默认文件块数10)
  int numMeta = locations.length / stripeLength; 
  if (locations.length % stripeLength != 0) numMeta++;

  // 每个文件都会生成meta文件, 它的最后一个block可能没完成, 但是暂时无视(meta文件没明白,做什么的?)
  statistics.numMetaBlocks += (numMeta * metaRepl);
  statistics.metaSize += (numMeta * metaRepl * stat.getBlockSize());
  return true;
}

关键在于第三点, 如何生成校验文件, 如图所示:

  1. 创建一个临时的目录和临时文件(tmp)
  2. 通过 ParallelStreamReader并行 读取多个源数据block, 用于生成校验文件, 对应4个校验块
  3. 第一个块先直接写到 HDFS
  4. 后续的块 写到 本地 的临时文件中
  5. 再调用 RS算法将这些block追加到之前存储第一个block的HDFS文件中
  6. 清理收尾, 之后将源文件副本设置为1, 实际是委派给了NN执行 (默认),

图中EC编码过程也就是代码的 Encoder.encodeFile() , 是生成校验文件的核心, 进入方法后的核心是 encodeFileToStream() , 再进入后最后落到 encodeStripeImpl() 方法, 里面使用多线程对应多个输入流, 最后对每一小块字节进行具体编码是 encodeStripeParallel() , 这个之后补充个 时序图 , 调用就会清晰许多.

时序图待补…

ParallelStreamReader 是并行读取源文件block的关键, 它的实现还稍微复杂一些, 又分了几个关键成员:

boundedBuffer

这里面主要的内容其实分为两块, 一块是多线程编程相关, 用到了线程池, 有界队列和信号量, 这里不单独叙述, 等并发编程专题单独说, 另一部分是IO细节, 包括如何操作每一部分字节并进行编码, 暂时跳过, 代码后续再补, 可以先看看整体的结构图 (细节在RSEncode和RSDecode中)

如上图所示, 解释一下:

  • 源文件的每 1 个块对应1个 输入流 , 对应到ReadResult就是用一个二维字节数组 readBufs[blockLength][bufferSize] 来存储读取每一个块的数据
  • 默认每一次读取 1M 的数据, 就调用encode编码一次
  • 校验文件的 第一个块 会通过输出文件的输出流直接输出到DN中
  • 剩余的块会保留在 本地 后期再追加到检验文件中
  • 生成校验文件后会把它与源文件的 lastUpdateTime 设为一致, 以确定降幅本成功(啥意思?) (重要)

4. BlockIntegrityMonitor

1. 整体结构

上面的 TriggerMonitor 主要做的是对普通文件raid, 是 编码 的过程, 那么 BlockIntegrityMonitor 就是做相反的事恢复丢失的块, 是 解码 的过程.

它主要实现类是由Work线程类的两个子类:

  • CorruptionWorker: 定期与NN通信, 确定raid的源文件和校验文件是否有 块损坏 的, 有则提交MR任务进行解码恢复损坏的块
  • DecommissioningWorker: 同HDFS里的概念, 这里是部分DN被安排下线后的块状态, 提交逻辑和上面几乎一样 (任务优先级稍低)

这里之后补个图看看整体流程, 优先级计算不是重点就不说了, 关键在于下面的 23 , 也就是提交了的修块任务如何执行的, 代码很多

2. ReconstructionInputFormat

同之前提到的, 这里主要也是 getSplits() 确定如何切分输入. 篇幅还挺长..

3. ReconstructionMapper

这是 修块 的实际map实现类, 要注意的是, 根据文件不同类型, 修复逻辑各不一样

  • 源文件
  • 校验文件
  • 归档 后的校验文件

推测最麻烦的应该是归档文件的修复? 这里的篇幅还挺多的其实… 先暂时跳过把

未完待续…

5. PlacementMonitor与PurgeMonitor

PlacementMonitor与PurgeMonitor配合使用完成删除过时的校验文件以及将Raid文件均匀分散到不同DN的功能。

6. HarMonitor

它做的主要是, 定时扫描需要归档的 校验文件 , 通过archive方式把他们合并为若干个大文件(har包), 以此来减轻大量校验文件对Namenode的压力, 关注几个点:

  1. 合并后的校验文件, 如果要读取(恢复), 是否会麻烦?
  2. 合并之后, 原有的多个校验文件合成了一个大文件, 那就需要额外对的映射 管理 , 是否麻烦?

0x05. 高版迁移

顾名思义, 高版迁移就是把Raidnode引入到高版本的HDFS上, 虽然这里说的代码大部分已经是修改或优化过的版本, 但是还是不能直接在高版上直接跑, 这里涉及到几个问题.

  1. 相关RPC是否兼容
  2. Yarn3.X需要适配(有些方法需要修改)
  3. 旧的HTTP服务是抛弃还是修复让它可以运行
  4. DN信息是否可以正常获取
  5. 生成校验文件是否完整, 恢复是否正常.

那为什么要做这件事, 迁移到HDFS2/3这种可能已支持EC的高版本上呢? 主要还是因为高版本EC(新EC)存在一些关键问题:

  1. 开启新EC之前的旧数据都无法读取, 只能在全新的集群使用
  2. 新EC开启后, 因为整个存储结构变化, 有一系列操作如 append/hflush/hsync 都无法再使用, 而这些API可能被上层的Hbase等直接调用
  3. 新EC的线上稳定性还没有广泛验证, 特别是在巨大规模的存储量下, 大家都在摸着石头过河ing
  4. 旧的EC虽然有种种问题, 但是它是几乎外挂 解耦 的, 方便管理操作, 有问题解决起来较容易, 新版EC完全cover的人都还不多

所以在目前的情况来说, 先让旧版的EC方案能在高版本HDFS3跑起来, 然后等新EC经过充分测试和 研究 后, 再使用是符合生产环境要求的. 然后下一篇单独记录迁移问题中需要修改的点. (包括Yarn的部分)

0x06. 问题与展望

上面介绍了RaidNode相关内容, 主要是说的它的优点, 下面来看看它的不足, 和为什么社区需要新的EC特性, 以及未来的发展趋势

1. 硬资源成本

低版本提交的RaidNode, 存在 两个 明显的物理资源占用:

  • 非常高的CPU编码/解码资源占用 (高版的EC通过支持Intel自带的指令集 ISA-L 可以显著降低, 可参考 官方Benchmark )
  • 数据恢复时, 大量的网络带宽消耗

软件层面可以通过检测集群的 空闲时段 (比如夜间), 主要在这段时间做生成/恢复block的操作. 从而提高资源的合理使用.

其他层面的话, 一般是只将其用在 偏冷 (访问次数少, 文件生成时间早)的数据集群, 减少其劣势出现的几率, 最典型的例子就是各种 备份集群 , 大文件( 云盘 )存储集群.

2. 软资源成本

除了直观可见的硬件上的资源占用, 它还有隐性一些的软件层面的资源占用, 同样 不可小觑 :

  1. 每1个满足raid条件的文件, 都会生成1个对应的校验文件(以及对应目录), 存在Namenode中, 等于说元数据INode可能会翻倍

    解决方案:考虑到读校验文件很少, 所以可对校验文件合并归档(Archive), 从而大幅减少INode占用

  2. RaidNode会定期的与NN通信, 确认哪些文件需要raid, 已被删除的raid的文件, 以及缺失的块信息等, 如果访问周期较短, 也会对NN-RPC造成一定压力. 此外Raidnode如果发现某个DN上存储文件的 source block 过多, 就会将它移到其他DN上, 但是此时NN并不会更新block-DN的映射, 就会导致读数据出现异常重试.

    解决方案:根据线上NN的实际访问情况, 调整/延长RaidNode的各种周期检查时间, 比如扫描周期应大于DN的块汇报时间.

  3. RaiNode引入后, 新增/删除一个文件的块, 在NN端分配DN的时候都会更加麻烦, 原因在于raid后的文件的块需要尽可能保证不存在同一个DN上, 以免某个DN挂掉导致多个block同时丢失, 默认的NN选择块的策略并非如此. 此外, 这个还会严重影响NN的启动耗时, 因为NN启动需要重建元信息, 对比增删block.

    解决方案:启动NN的时候先采用默认的block分配策略, 完成后再修改为考虑Raid的分配策略, 动态刷新生效 (这个如何做的? 待确认 )

  4. raid后的文件块同样还会影响到原有的balance机制, 默认的balance不会考虑raid, 从而可能把多个 source block 放在一个DN

    解决方案:在NN上添加新的RPC接口, 用于查询block所属文件, 并结合raid的块选择策略, 将其均匀分散 ( 待确认 )

  5. Raid完成后, 原本文件的三副本实际降为1副本, 潜在的影响有 本地读 概率降低, 从而增加计算作业的执行时间的可能

    解决方案:本质同硬件资源, 只对冷数据采用Raid可明显降低此影响

当然功能上说, 它也有一系列的小问题, 比如因为它会归档校验文件的机制, 使得操作归档后的校验块就会很麻烦.

3. “小文件”问题

要知道上面RS模式讲的是以”10+4”为基准的, 那么需要考虑一个问题, 10个块的文件有4个校验块, 那5个块的文件呢? 2个块的文件呢?

实际情况是, 集群中会存在大量的” 小文件 “(block<3), 那它还能获得 1.4X 的效果么? 答案显然是 不能的 , 下图是两种校验方案不同block数的节省资源对比图:

  • 一个文件对应的block越小, 通过RAID能节省的空间就越少
  • 一个文件少于 3个 block, 那它基本不能节省任何空间 (<3个block的文件, 就在这被定义为”小文件”)

要知道3个block及以上的文件至少是 256MB +, 大部分场景中, 小于256MB的文件肯定是不少的, 所以这个问题全局影响是很大的, 比如你的网盘中, 虽然会有电影和一些大安装包单文件占用上G, 但是大部分存储的仍然是小文件, 比如照片库可能总共十几G, 是由上万张每个几MB的图片组成. 按文件粒度, 它们都无法被Raid

那如何解决这个问题呢? 有思路是把Raidnode的操作单元从 文件级别 , 提高到 目录级别 , 这样一个图片目录下的许多小文件就可一起被Raid, 详细可参考 此篇 .

参考资料:

  1. HDFS-RaidNode细化-Slide(Recommend)
  2. Introduction to HDFS Erasure Coding in Apache Hadoop(一)_Recommend
  3. HDFS Erasure Coding in Production(二)_Recommend
  4. Facebook-github-HDFS0.20
  5. JIRA-HDFS-503-Facebook-RaidNode-Commit
  6. HDFS-Raid问题优化相关
  7. HDFS的EC技术引入
  8. 通过HDFS-RaidNode节省存储空间
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章