登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Kerberos服务介绍与部署
Hadoop HA简介
HDFS手动故障转移
HDFS手动故障转移示例
HDFS自动故障转移
YARN自动故障转移
Hadoop白名单与扩容
HDFS存储优化-纠删码
HDFS冷热存储分离
HDFS慢磁盘监控
HDFS小文件归档
源码分析-NN启动(一)
源码分析-NN启动(二)
源码分析-NN启动(三)
热门推荐>>>
中台架构
中台建设与架构
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Hadoop
>>
源码分析-NN启动(三)
源码分析-NN启动(三)
2021-07-31 13:52:38 星期六 阅读:1262
![](/static/images/article_images/1693147767.43787.png) [之前文章](https://libins.cn/detail/moduleName=hadoop&articleId=cc53c0a1248a9af24de753a3a5882128)提到了NameNode的启动总共包含以下7个步骤 | 序号 | 启动步骤 | | ------------ | ------------ | | 1 | NameNode对象初始化 | | 2 | 启动9870端口服务 | | 3 | 加载镜像文件和编辑日志 | | 4 | 初始化NN的RPC服务端 | | 5 | NN启动资源检查 | | 6 | DN心跳超时判断 | | 7 | 进入安全模式 | 本篇文章主要讲述第6、7个步骤的关键源码,前5个步骤见[上篇文章](https://libins.cn/detail/moduleName=hadoop&articleId=1cf8a07934cbabfd8c194523cfbbb4c8)介绍 #### DN心跳超时判断 Ctrl + Shift + F 搜索datanodeManager.activate并点击进入DatanodeManager.java文件 ```java void activate(final Configuration conf) { datanodeAdminManager.activate(conf); heartbeatManager.activate(); } ``` 点击activate()方法 ```java void activate () { // 启动的线程,搜索 run 方法 heartbeatThread.start(); } public void run () { while (namesystem.isRunning()) { restartHeartbeatStopWatch(); try { final long now = Time.monotonicNow(); if (lastHeartbeatCheck + heartbeatRecheckInterval < now) { // 心跳检查 heartbeatCheck(); lastHeartbeatCheck = now; } if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) { synchronized (HeartbeatManager.this) { for (DatanodeDescriptor d : datanodes) { d.setNeedKeyUpdate(true); } } lastBlockKeyUpdate = now; } } catch (Exception e) { LOG.error("Exception while checking heartbeat", e); } try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException ignored) { } // avoid declaring nodes dead for another cycle if a GC pause lasts // longer than the node recheck interval if (shouldAbortHeartbeatCheck(-5000)) { LOG.warn("Skipping next heartbeat scan due to excessive pause"); lastHeartbeatCheck = Time.monotonicNow(); } } } void heartbeatCheck () { final DatanodeManager dm = blockManager.getDatanodeManager(); boolean allAlive = false; while (!allAlive) { // locate the first dead node. DatanodeDescriptor dead = null; // locate the first failed storage that is not on a dead node. DatanodeStorageInfo failedStorage = null; // check the number of stale nodes int numOfStaleNodes = 0; int numOfStaleStorages = 0; synchronized (this) { for (DatanodeDescriptor d : datanodes) { // check if an excessive GC pause has occurred if (shouldAbortHeartbeatCheck(0)) { return; } // 判断 DN 节点是否挂断 if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; } if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); for (DatanodeStorageInfo storageInfo : storageInfos) { if (storageInfo.areBlockContentsStale()) { numOfStaleStorages++; } if (failedStorage == null && storageInfo.areBlocksOnFailedStorage() && d != dead) { failedStorage = storageInfo; } } } // Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes); dm.setNumStaleStorages(numOfStaleStorages); } } } boolean isDatanodeDead(DatanodeDescriptor node) { return (node.getLastUpdateMonotonic() < (monotonicNow() - heartbeatExpireInterval)); } private long heartbeatExpireInterval; // 10 分钟 + 30 秒 this.heartbeatExpireInterval =2*heartbeatRecheckInterval +10*1000*heartbeatIntervalSeconds; private volatile int heartbeatRecheckInterval; heartbeatRecheckInterval =conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes private volatile long heartbeatIntervalSeconds; heartbeatIntervalSeconds =conf.getTimeDuration( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,TimeUnit.SECONDS); public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; } ``` #### 安全模式 Ctrl + Shift + F 搜索startCommonServices并点击进入FSNamesystem.java文件 ```java void startCommonServices (Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { nnResourceChecker = new NameNodeResourceChecker(conf); //检查是否有足够的磁盘存储元数据(fsimag(e默认100m)editLog(默认100m) checkAvailableResources(); assert !blockManager.isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); 尚硅谷大数据技术之 Hadoop 源码解析 } } boolean isDatanodeDead (DatanodeDescriptor node){ return (node.getLastUpdateMonotonic() < (monotonicNow() - heartbeatExpireInterval)); } private long heartbeatExpireInterval; // 10 分钟 + 30 秒 this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; private volatile int heartbeatRecheckInterval; heartbeatRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes private volatile long heartbeatIntervalSeconds; heartbeatIntervalSeconds = conf.getTimeDuration( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; // 开始进入安全模式 prog.beginPhase(Phase.SAFEMODE); // 获取所有可以正常使用的 block long completeBlocksTotal = getCompleteBlocksTotal(); prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal); // 启动块服务 blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices"); } registerMXBean(); DefaultMetricsSystem.instance(). register(this); if(inodeAttributeProvider !=null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true); this.nameNodeHostName =(serviceAddress !=null)? serviceAddress.getHostName():""; } ``` 点击第29行的getCompleteBlocksTotal ```java public long getCompleteBlocksTotal () { // Calculate number of blocks under construction long numUCBlocks = 0; readLock(); try { // 获取正在构建的 block numUCBlocks = leaseManager.getNumUnderConstructionBlocks(); // 获取所有的块 - 正在构建的 block = 可以正常使用的 block return getBlocksTotal() - numUCBlocks; } finally { readUnlock("getCompleteBlocksTotal"); } } ``` 点击第32行的activate方法 ```java public void activate (Configuration conf,long blockTotal){ pendingReconstruction.start(); datanodeManager.activate(conf); this.redundancyThread.setName("RedundancyMonitor"); this.redundancyThread.start(); storageInfoDefragmenterThread.setName("StorageInfoMonitor"); storageInfoDefragmenterThread.start(); this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); } ``` 点击第10行的activate ```java void activate ( long total){ assert namesystem.hasWriteLock(); assert status == BMSafeModeStatus.OFF; startTime = monotonicNow(); // 计算是否满足块个数的阈值 setBlockTotal(total); // 判断 DataNode 节点和块信息是否达到退出安全模式标准 if (areThresholdsMet()) { boolean exitResult = leaveSafeMode(false); Preconditions.checkState(exitResult, "Failed to leave safe mode."); } else{ // enter safe mode status = BMSafeModeStatus.PENDING_THRESHOLD; initializeReplQueuesIfNecessary(); reportStatus("STATE* Safe mode ON.", true); lastStatusReport = monotonicNow(); } } ``` 点击第6行setBlockTotal 方法 ```java void setBlockTotal ( long total){ assert namesystem.hasWriteLock(); synchronized (this) { this.blockTotal = total; // 计算阈值:例如:1000 个正常的块 * 0.999 = 999 this.blockThreshold = (long) (total * threshold); } this.blockReplQueueThreshold = (long) (total * replQueueThreshold); } this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT); public static final float DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f; ``` 点击areThresholdsMet ```java private boolean areThresholdsMet () { assert namesystem.hasWriteLock(); // Calculating the number of live datanodes is time-consuming // in large clusters. Skip it when datanodeThreshold is zero. int datanodeNum = 0; if (datanodeThreshold > 0) { datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes(); } synchronized (this) { // 已经正常注册的块数 》= 块的最小阈值 》=最小可用 DataNode return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold; } } ```