视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
HDFS分析
2025-09-30 23:21:48 责编:小OO
文档
1)写流程分析 ---1、数据节点选择机制、管道建立机制、块持久化机制、包括块一致性和准确性校验机制等

2)读流程分析 ---1、块的选择机制、块的一致性和准确性校验机制、读取策略等

3)写的异常处理及恢复机制

4)读的异常处理和恢复机制

5)namenode与datanode的存储结构,及通信机制

6)不同数据节点之间,block的Copy机制

7)块的一致性校验机制

8)机架感知策略

9)0.21.0  backupnamenode的实现原理及作用

10)扩容和退服策略

11)数据重新平衡策略

12)安全模式---进入安全模式的机制

RPC通信

系统中的通讯,例如,读客户端向NN请求文件block信息.DN向NN请求任务(副本复制).任务向TT汇报执行进度,TT向JT汇报执行进度.

ClientProtocol:客户端调用协议,涉及文件操作,DFS管理,升级(DFSAdmin)

ClientDatanodenodeProtocol:更新块大小,传输中恢复操作,获取本地路径

DatanodeProtocol:DN与NN通讯协议,注册,BlockReport,心跳,升级

NamenodeProtocol: SN和NN通讯协议,通知NN使用新的fsimage和edit

RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol 

版本控制

1,底层RPC高低版本头.

2,业务上的接口VersionedProtocol.getProtocolVersion高低版本协议方法变动.

数据序列化反序列化

Client

Client代理模式,调用RPC.getProxy实际上返回的一个代理对象,当调用方法的时候实际调用的是Invoker, Invoker将协议,调用的方法名,参数,参数类型封装成Invocation->Call(id,返回值)对象经过client发送到server,放入map,并读取返回流,根据流中的id,判断是服务器返回的是那次调用的结果.

    Connection线程负责读取Server返回值,根据读取的id,从map中获取call,设置返回值,唤醒调用线程(call.notify),调用Client的线程发送完后会wait(Call)直到Connection获取到返回值.

读取时候如果超时(ipc.ping.interval)就发送一次ping,如果没有出现IOException就继续读取,Conneciton可以根据标识(地址,用户组信息,协议)共用.

Server

基于NIO,Listener关注OP_ACCEPT事件,当有客户端连接过来,Accept后,从readers(treadpool)中选取一个Reader将客户端Channel注册到Reader中的NIO selector,并新建一个Connection对象关联客户端Channel,Reader关注OP_READ事件.

客户端建立连接后,首先发送的是ConnnectionHeader包含协议名,用户组信息,验证方法,Connection会根据以上信息进行校验.之后将是先读取4byte的长度代表这次请求的数据的长度,然后一直等待事件触发读取够长度,将读取的数据 解码为调用id和param,新建一个Call对象(关联Connection)放入call队列中,handlers中的Handler会将Call中callQuene中取走,Call中的param实际为Invocation对象,包含调用方法名,参数名,参数类型,由这些信息使用Java反射API调用Server的instance对象,获取返回值,组织返回数据,写入Call的response(ObjectWritable)属性中,马上调用responder的doRespond方法,将Call加入到Connection的responseQuene最后,如果responseQuene长度等于1做一次NIO写操作,如果不能一次性能够将数据写完,将客户端channel注册到responder关注写事件OP_WRITE,下一次读取responseQuene的第一位写(保证一致),如果长度不为1证明该channel已经注册到了responder了直接加入队列,由responder线程后续处理.

NOTE:客户端关闭流后出发一次读操作,返回为-1,Server关闭连接

readers个数,handler个数,callQuene深度

为什么要NIO

CurCall与获取客户端IP

    Handler获取一个Call后,会将Server的curCall(ThreadLocal类型)设置为当前的Call,调用Instance方法实际是在Handler线程中,在Instance的方法内就可以使用Server提供的方法来获取客户端IP

清理

Responder

长时间未发送到客户端的响应,注册到responder的Call如果长时间没有发送到客户端,每隔一段时间会清理掉

Reader

超过最大ipc.client.idlethreshold

总连接数超过多少后,开始清理空闲连接

ipc.client.kill.max

一次最多清理多少个空闲连接

过一下线程

NameNode/DataNode数据存储结构

checkpoint

 

回收站(),仅仅针对Shell hadoop fs,如果直接调用程序接口,没这个功能.  与普通文件系统一致.

(广度,深度,版本1,Integer.MAX_VALUE ,2可做配置)

INode实现了Comparable接口,用来排序,查找.

插入时候按照大小顺序,先找出位置在插入.所以会有以下顺序

查找每层二分查找. Collections.binarySearch

2,block->blockInfo对应

Block 有3个long型的属性blockId(随机生成)numBytes(块大小),generationStamp

BlockInfo继承Block添加了2个属性,实现了用户LightWeightGSet的LinkedElement接口

inode:引用文件Inode

    triplets:3Xreplication的数组,即replication 个组,每组有3个元素,第一个指向DatanodeDescriptor,代表在这个DN上有一个Block,第二个和第三个分别代表DN上的上一个blockInfo和下一个blockInfo

DatanodeDescriptor有一个属性blockList指向一个BlockInfo,因为每个BlockInfo中的triplets中有一组记录着对应的DN上的上一个,下一个BlockInfo,所以从这个角度来看BlockInfo是一个双向链表.

    

LightWeightGSet

Gset 类似Set但提供get方法,主要用于存储BlockInfo,根据Block找到BlockInfo

其中一个实现LightWeightGSet,利用数组存储元素,用链表解决冲突问题,类似HashMap但是没有ReHash操作

BlocksMap初始化LightWeightGSet时候,会根据JVM内存将数组的大小初始为最大能占用的内存数(4194304 -Xmx1024M)加上高效的hash映射算法, LightWeightGSet在BlockInfo数量比较小的时候get性能逼近数组.

BlockInfo继承Block,没有重写hashCode和equals方法,在Block中equals方法只要求传入的对象是Block实例并且blockId相等,就认为两个对象相等,故存储BlockInfo时候分配的在数组中的Index和Get时候由Block的hashCode定位是一致的.

 

low memory footprint, No rehash will be performed

节点选择

HDFS’s placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack.

NN负责

第一个节点选择如果是客户端所在主机就是DN,优先选择DN,否则随机.

第二个节点选择同第一个节点在不同一个机架的DN.

第三个节点选择第二个节点所在机架上的另外一个DN.

剩下的随机.

同时满足:

    1,DN状态正常,退役中,挂掉

    2,空间足够(通过心跳传给NN)

3,DN负载状态,通过有多少活动连接计算,体现在DataXceiver的个数(通过心跳传给NN)

4,一个机架上放的不能太多.只能放 (totalNumOfReplicas-1)/numOfRacks+2;  尽量均匀,并满足上述第二节点.

写文件流程

校验单元Chunk(数据和校验信息)

传输单元Packet(头信息,传输单元{0,n})  (0心跳包(dataQueue=0))

1,通过RPC向NN请求建立一个文件,NN做一系列的检查,通过后将文件信息记录到EditLog中,flush到硬盘.

2,通过RPC向NN请求向文件新加一个block,NN生成blockId, generationStamp,按照策略选取n(副本)个DN

3,根据返回的DN,建立PipeLine,连接第一个DN,发出传输头信息,获取InputStream

    DN上有DataXceiverServer监听端口,传统模式, 处理block数据流(不同于RPC处理小数据量,元数据的通信),每接收到一个连接,新建一个DataXceiver线程处理, DataXceiver根据,报文头中的操作指令(read block ,write block…)来作具体的操作,这里是write block,根据传递过来的信息主要做3件事情

1,在blobksBeingWritten建立block文件,如果出错会抛出异常,关闭流,上一个节点的第3步会出错

2,取第一个剩余节点,作为下一个节点,向下一个节点传送重新封装(改变报文中的剩余节点)的报文头,如果出错,向上一个节点返回出错代码和DN

3,获取下一个节点的InputStream,如果出错,向上一个节点返回出错代码和DN,如果没出错将读取到的反馈信息返回到上一个节点.

4,DataStreamer从dataQuene取出Packet传输,并放入ackQuene中,ResponseProcessor处理反馈信息,接收到一个Packet的反馈之后将其从ackQuene中移除.

5,DN中的DataXceiver接受到Packet后会根据Packet中的检验区和数据区校验数据,重要每次写的时候是根据offsetInBlock取数据文件和meta文件的位置写入(处理情况, dn2挂掉后,client没获取到dn3接收到packet_X的反馈,恢复PipeLine后继续向dn3发送packet_X),flush到硬盘, 设置可见长度visibleLength,将Packet发送到下一个节点,然后将Packet放入ackQuene中由PacketResponder处理, PacketResponder接受下一个节点的Packet的反馈,移除并且向上一个节点反馈,如果反馈的Packet是Block的最后一个,meta文件,block文件flush到硬盘,关闭.通知NameNode记录block信息(并没有),NameNode更新block->blockInfo对应.

调用sync记录通知NN记录block,无论你写了多大,只有sync,close前的数据是保险的.

例如M一个块,你写了200M,但是你没sync, close此时NN重启,文件大小为0.

此时如果客端挂掉,等到过了LEASE_HARDLIMIT_PERIOD,会启动恢复操作,文件大小为200M.

正在写的块的处理,无blockInfo对应,建立PipeLine的时候,NN分配DN时候记录到InodeFileUnderConstruction中.

读取反馈超时状态:

    获取下一节点的InputStream时候会有一个超时参数,为后续节点的个数X 3 * 1000;

租约:

在此过程中,DFSClient会有一个线程不断的更新租约在软超时时间内.

主节点负责恢复块过程:

    联系其他的DN,令其停止block的写线程,获取block的元信息,主要是大小.(然后比较其大小,取最小的设置为block的长度.)向NameNode请求一个新的generationStamp,新包装一个block设置长度和generationStamp,分别更新到对于的DN,DN操作:修改meta文件名,按长度对应截取数据文件和meta文件.最后更新NameNode中的block信息

根据需要要求NN记录关闭文件日志.

generationStamp为保持一致性,有可能其他DN上挂掉之后重新加入集群,此时它的block可能已经不是最新的.

例如:DN已经将block移动到current中 ,DFSClient由于网络原因(DN断网了)没有收到DN的反馈,认为DN挂掉,开始恢复块,重新生成了generationStamp.

负载

建立PipeLine时候出错

    通知NN抛弃建立的Block,将重新请求NN向文件添加一个block,并把错误的DN传递给NN,NN在这次请求中排除.

传输数据时候DN出错

    keepLength=false(当append的时候keepLength=true)

    取最小的节点(compare by host:port)作为主节点,开始恢复块的过程.

    剔除出错的DN,用剩余的DN建立PipeLine,此时报文头中传送的recoveryFlag为true,

    1,如果文件已经移动到了current目录,涉及到升级,current, previous目录下为硬连接.如果是升级后的block,将block复制到detached目录下,然后    重命名为current目录下的文件名.返回供写.这样保证了previous目录下有一份升级前的副本.

    2,如果文件还是在blobksBeingWritten目录,使用原来文件.

    3,如果还没建立文件,建立文件.

最小的块与重新发送的逻辑.

Packet持久化到硬盘后发送反馈

同一时间,最小的副本,会大于已经发送的长度(dataQuene中已经发完,并且从ackQuene中移除了),而写的时候是按在块中的位置来写的.

DFSClient出错

    DFSOutputStream关闭的时候FSNamesystem会移除租约,记录文件关闭日志.可能是在写的过程中出错.FSNamesystem就没有关闭.NameNode租约失效之后, Lease Monitor会检查其中过了硬租约(软超时是要有Client去create或append一个文件时才会去判断)期限的文件,开始做收尾工作.

出错情况1:从未没有block写入,blocks.length==0

        移除租约, InodeFileUnderConstruction->InodeFile,记录关闭文件日志,检查下副本数.

出错情况2: block写入过程中

        那么NN上的InodeFileUnderConstruction会有targets,分配一个主节点,将租约私有,在下一次心跳的时候将block和targets 封装成DNA_RECOVERBLOCK指令发给主节点,主节点做具体恢复操作.

NameNode出错

    如果在写入块的过程中,NameNode出错,没有影响.当写下一个块的时候,DFSClient会因为获取不到分配的节点而出错.

    因为没有关闭文件,NameNode启动的时候会将这个文件作为正在创建的文件,恢复其租约,失效的时候发起恢复过程.

    如果没有调用sync那么block信息不会保存到日志文件中,nn挂掉重启之后当前正在写的块丢失.

Append文件

记录一个OPEN日志

将INodeFile转换成InodeFileUnderConstruction

将最后一个块的节点设置为targets.

最后一个块分配主节点开始恢复块(实际stamp)(3个副本,其中一个副本DN不可用,后来又加入集群中)

Append时候第一次建立PipeLine时候出错

恢复的时候保留长度,不存在dataQueue,ackQueue

读文件

API介绍:

    read(byte buf[], int off, int len)尽力读取(跨block)长度为len的数据到buf的offset,返回读取的字节数.

从NN获取block的信息.

客户端如何读取到最近的节点

这里并没有按照树形结构来算. pseudoSortByDistance

NN返回block的时候会根据集群和Reader将返回的locations进行排序. 

Reader与block在同一机器上,Reader和block在同一机架上.

Reader不在集群中随机.

首先从NN获取副本信息,如果文件还正在创建中,reader联系返回的第一个dn更新最后一个block的长度,与nn返回的文件长度比较,确定文件有多大.

根据当前位置,定位到block,根据block按(优先)尝试联系DN,建立连接.

(version:short:2)+(type:byte:1)+( bytesPerChecksum:int:4) +671088/512*4=524295

十六进制200->十进制512

读异常处理

读的时候保持当前位置pos,如果读取过程中出现IOException的话,重新选取节点再次读取.

读的时候的检验机制FSInputChecker负责校验数据,(它的子类负责读取到检验单元),并且根据校验信息校验.

例如文件A有68M,打开文件

seek到512+256字节处,读取1024个字节,那么实际传输的是从第512字节-第2048字节,按校验单元来传送的.

在这个基础上又加上了缓冲,一次发送的最大校验单元数,根据transferToAllowed(dfs.datanode.transferTo.allowed默认65536,否则(4096)来.小于的一个packet发完.大于的分packet发送.所以传送的可能是第512字节到512+65536(4096) 个字节.

(NIO效率)

seek的优化:

    Seek导致的结果是关闭blockReader,然后在重新打开一个blockReader,期间需要打开端口做初始化一些事情,开销大,对于seek如果目标离当前位置小于TCP缓冲大小(128k取的常见值),直接读取这段数据,然后忽略.

BlockReaderLocal用来直接以文件形式读取在本机的副本,203策略:通过流, DataXceiver.

读取都是采用

read1方法每次读取一定量的数据,然后根据返回的字节数,在做操作.

NN:ReplicationMonitor

    负责检查block的副本,选择源dn和目的地dn,选择策略和写入时候一致,加入将复制信息加入到NN中,NN在下次dn心跳的时候将命令传递给dn执行.

安全模式

    命名空间只读,不进行副本复制.

启动时候自动进入安全模式. 

手动进入,bin/hadoop dfsadmin  –safemode 此时如果有文件正在写会出错(和NN通信时候)

保存namespace

    SafeModeMonitor

    安全的block:副本数大于dfs.replication.min

    当安全的block数目占比大于dfs.safemode.threshold.pct离开安全模式.

数据重新平衡策略

start-balancer.sh  –threshold   5

dfs.balance.bandwidthPerSec 占用带宽

如图中界限,将误差上限右部的向左边移动block.

优先选择负载小的,优先选择同一机架的.

对于多机架,可能会影响,块分布策略.

扩容和退服

扩容:hadoop-daemon.sh  start  datanode

退服: 将dfs.hosts.exclude对应的文件中的DN取出.

    如果其中所有的块都达到了副本数,退役,否则等待.

DecommissionManager$Monitor

当满足退役条件后,在dn心跳的时候就发送一个退役命令dn关闭自己.

EditLogOutputStream

抽象类,保证

    1, setReadyToFlush时刻已经存在的数据将被flush,当flush的同时,可以继续写入流.

    2,flush后保证数据已经持久化.

BackupNode

EditLogOutputStream

通用的抽象类将编辑日志存储到持久储存器.

实现了JournalProtocol协议,当活动的NameNode产生日志的时候会调用这个方法将日志传给BN

机架感知

#!/bin/bash

while [ $# -ne 0 ]

do

case $1 in

#机架1

192.168.2.240|192.168.2.241|192.168.2.242 ) echo "/sw1/rack1";;

#机架2

192.168.2.243|192.168.2.244|192.168.2.245 ) echo "/sw1/rack2";;

* ) echo "/unknownrack" ;;

esac

shift

done

#测试语句

#sh dns.sh 192.168.2.241  192.168.2.242 192.168.2.245 234下载本文

显示全文
专题