登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Kerberos服务介绍与部署
Hadoop HA简介
HDFS手动故障转移
HDFS手动故障转移示例
HDFS自动故障转移
YARN自动故障转移
Hadoop白名单与扩容
HDFS存储优化-纠删码
HDFS冷热存储分离
HDFS慢磁盘监控
HDFS小文件归档
源码分析-NN启动(一)
源码分析-NN启动(二)
源码分析-NN启动(三)
热门推荐>>>
中台架构
中台建设与架构
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Hadoop
>>
源码分析-NN启动(二)
源码分析-NN启动(二)
2021-07-25 19:40:24 星期日 阅读:1224
![](/static/images/article_images/1693147767.43787.png) [上篇文章](https://libins.cn/detail/moduleName=hadoop&articleId=cc53c0a1248a9af24de753a3a5882128)提到了NameNode的启动总共包含以下7个步骤 | 序号 | 启动步骤 | | ------------ | ------------ | | 1 | NameNode对象初始化 | | 2 | 启动9870端口服务 | | 3 | 加载镜像文件和编辑日志 | | 4 | 初始化NN的RPC服务端 | | 5 | NN启动资源检查 | | 6 | DN心跳超时判断 | | 7 | 进入安全模式 | 本篇文章主要讲述前5个步骤的关键源码,最后两个于[下篇文章](https://libins.cn/detail/moduleName=hadoop&articleId=c18a4dd3efcf189d529bd515cd6d0e68)介绍 #### 启动9870端口服务 startHttpServer.java ```java private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); httpServer.start();// 点击start httpServer.setStartupProgress(startupProgress); } protected InetSocketAddress getHttpServerBindAddress(Configuration conf) { InetSocketAddress bindAddress = getHttpServerAddress(conf); ... ... return bindAddress; } protected InetSocketAddress getHttpServerAddress(Configuration conf) { return getHttpAddress(conf); } public static InetSocketAddress getHttpAddress(Configuration conf) { return NetUtils.createSocketAddr( conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT)); } public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; ``` 点击上面第三行的httpServer.start() NameNodeHttpServer.java ```java void start() throws IOException { ... ... // Hadoop 自己封装了 HttpServer,形成自己的 HttpServer2 HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_K EY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); ... ... httpServer = builder.build(); ... ...httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); setupServlets(httpServer, conf); httpServer.start(); ... ...} ``` 点击上面第9行的 setupServlets ```java private static void setupServlets(HttpServer2 httpServer, Configuration conf) { httpServer.addInternalServlet("startupProgress", StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, ImageServlet.class, true); } ``` #### 加载镜像文件和编辑日志 NameNode.java文件搜索loadNamesystem ```java protected void loadNamesystem(Configuration conf) throws IOException { this.namesystem = FSNamesystem.loadFromDisk(conf); } static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = monotonicNow(); try { namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = monotonicNow() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime()); return namesystem; } ``` #### 初始化NN的RPC服务端 NameNode.java搜索createRpcServer ```java protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); } ``` 点击上面2行NameNodeRpcServer进入NameNodeRpcServer.java ```java public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { ... .... serviceRpcServer = new RPC.Builder(conf).setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()) .setNumHandlers(serviceHandlerCount) .setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager()).build(); ... .... } ``` #### NN 启动资源检查 NameNode.java搜索startCommonServices ```java private void startCommonServices(Configuration conf) throws IOException { namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } rpcServer.start(); try { plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); } catch (RuntimeException e) { String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY); LOG.error("Unable to load NameNode plugins. Specified list of plugins: " + pluginsValue, e); throw e; } ......} ``` 点击上面第二行的startCommonServices进入FSNamesystem.java ```java void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { nnResourceChecker = new NameNodeResourceChecker(conf); //检查是否有足够的磁盘存储元数据(fsimag(e默认100m)editLog(默认100m) checkAvailableResources(); assert !blockManager.isPopulatingReplQueues(); 尚硅谷大数据技术之 Hadoop 源码解析 StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); long completeBlocksTotal = getCompleteBlocksTotal(); // 安全模式 prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal); // 启动块服务 blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices"); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true); this.nameNodeHostName = (serviceAddress != null) ? serviceAddress.getHostName() : ""; } ``` 点击第6行NameNodeResourceChecker进入NameNodeResourceChecker.java ``` public NameNodeResourceChecker(Configuration conf) throws IOException { this.conf = conf; volumes = new HashMap<String, CheckedVolume>(); // dfs.namenode.resource.du.reserved 默认值 1024 * 1024 * 100 =》100m duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT); Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VO LUMES_KEY)); Collection<URI> localEditDirs = Collections2.filter(FSNamesystem.getNamespaceEditsDirs(conf), new Predicate<URI>() { @Override public boolean apply(URI input) { if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { return true; } return false; } }); // 对所有路径进行资源检查 for (URI editsDirToCheck : localEditDirs) { addDirToCheck(editsDirToCheck, FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(editsDirToCheck)); } // All extra checked volumes are marked "required" for (URI extraDirToCheck : extraCheckedVolumes) { addDirToCheck(extraDirToCheck, true); } minimumRedundantVolumes = conf.getInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT); ``` 点击checkAvailableResources进入FNNamesystem.java文件 ``` void checkAvailableResources () { long resourceCheckTime = monotonicNow(); Preconditions.checkState(nnResourceChecker != null, "nnResourceChecker not initialized"); // 判断资源是否足够,不够返回 false hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); resourceCheckTime = monotonicNow() - resourceCheckTime; NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime); ``` 点击第6行hasAvailableDiskSpace ``` public boolean hasAvailableDiskSpace () { return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(), minimumRedundantVolumes); } ``` 点击第二行areResourcesAvailable ``` static boolean areResourcesAvailable ( Collection < ? extends CheckableNameNodeResource > resources,int minimumRedundantResources){ // TODO: workaround: // - during startup, if there are no edits dirs on disk, then there is // a call to areResourcesAvailable() with no dirs at all, which was // previously causing the NN to enter safemode if (resources.isEmpty()) { return true; } int requiredResourceCount = 0; int redundantResourceCount = 0; int disabledRedundantResourceCount = 0; // 判断资源是否充足 for (CheckableNameNodeResource resource : resources) { if (!resource.isRequired()) { redundantResourceCount++; if (!resource.isResourceAvailable()) { disabledRedundantResourceCount++; } } else { requiredResourceCount++; if (!resource.isResourceAvailable()) { // Short circuit - a required resource is not available. 不充足返回 false return false; } } } if (redundantResourceCount == 0) { // If there are no redundant resources, return true if there are any // required resources available. return requiredResourceCount > 0; } else { return redundantResourceCount - disabledRedundantResourceCount >= minimumRedundantResources; } } interface CheckableNameNodeResource { public boolean isResourceAvailable(); public boolean isRequired(); } ```