Sometimes, we need to add HA to a running cluster. Take a hadoop 2.4.1 cluster as an example, I will list the steps to setup journal quorum based name node HA and resource manager HA.
Backup the HDFS Edit Files and fsimage Files
Before we do the upgrade, we need to backup the HDFS edit files and fsimage files. It is possible to use the following curl command to download the fsimage file from a running cluster, where 54.226.111.204 is the name node IP.
$ curl -o fsimage.test "http://54.226.111.204:9101/imagetransfer?getimage=1&txid=latest"
But be aware that the fsimage file only contains the metadata when the name node starts since Name Node merges fsimage and edits files only during start up as stated in Hadoop document as follows.
The NameNode stores modifications to the file system as a log appended to a native file system file, edits. When a NameNode starts up, it reads HDFS state from an image file, fsimage, and then applies edits from the edits log file. It then writes new HDFS state to the fsimage and starts normal operation with an empty edits file. Since NameNode merges fsimage and edits files only during start up, the edits log file could get very large over time on a busy cluster. Another side effect of a larger edits file is that next restart of NameNode takes longer.
To be safe, we need to shutdown all the hadoop daemons and then to backup the whole name node directory including the edit files and the fsimage files.
From my observation, the standby name node cannot process any read/write operation, if the hdfs client still points to the old name node and it now becomes a standby one, you will see tons of warning messages.
2014-09-25 06:18:24,247 INFO org.apache.hadoop.ipc.Server (IPC Server handler 63 on 9000): IPC Server handler 63 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations from 10.61.147.15:58693 Call#1 Retry#0: error: org.apache.hadoop.ipc.StandbyException: Operation category READ is not supported in state standby
After that, tar and zip one of the meta data folder, for example, assume it is /mnt/var/lib/hadoop/dfs-name, and upload the zipped file to a specific place to back up the files.
$ tree /mnt/var/lib/hadoop/dfs-name/ /mnt/var/lib/hadoop/dfs-name/ ├── current │ ├── edits_0000000000000000001-0000000000000025667 │ ├── edits_0000000000000025668-0000000000000027061 │ ├── edits_0000000000000027062-0000000000000027828
...... │ ├── edits_inprogress_0000000000000030491 │ ├── fsimage_0000000000000064526 │ ├── fsimage_0000000000000064526.md5 │ ├── fsimage_0000000000000065939 │ ├── fsimage_0000000000000065939.md5 │ ├── seen_txid │ └── VERSION └── in_use.lock 1 directory, 50 files
One sample backup script is listed here.
#!/usr/bin/env bash echo "force the name node to enter safemode" hdfs dfsadmin -safemode enter echo "go to one of the meta file folders" cd /mnt/var/lib/hadoop/ echo "pack the meta files" tar zcvf dfs-name.tar.gz dfs-name echo "upload the backup file to s3" hadoop fs -copyFromLocal dfs-name.tar.gz s3://my_s3_bucket echo "remove the backup file" rm dfs-name.tar.gz echo "leave the safemode" hdfs dfsadmin -safemode leave
Configure Name Node HA
We need to select 3 nodes in the cluster for journal quorum, zookeeper quorum, and two name nodes to configure name node HA. Here, I pick the original name node ip-10-235-3-184 and two other nodes ip-10-16-135-195 and ip-10-61-147-15 to configure the name node HA.Update hadoop configurations on the 3 selected nodes
Remove the following line from core-site.xml
<property><name>fs.default.name</name><value>hdfs://10.235.3.184:9000</value></property>
and add the following line
<property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property>
After that, add the following lines to hdfs-site.xml
<property><name>dfs.nameservices</name><value>mycluster</value></property> <property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property> <property><name>ha.zookeeper.quorum</name><value>ip-10-235-3-184:2181,ip-10-16-135-195:2181,ip-10-61-147-15:2181</value></property> <property><name>dfs.journalnode.edits.dir</name><value>/mnt/var/lib/hadoop/journal/local/data</value></property> <property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://ip-10-235-3-184:8485;ip-10-16-135-195:8485;ip-10-61-147-15:8485/mycluster</value></property> <property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>ip-10-235-3-184:9000</value></property> <property><name>dfs.namenode.http-address.mycluster.nn1</name><value>ip-10-235-3-184:9101</value></property> <property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>ip-10-61-147-15:9000</value></property> <property><name>dfs.namenode.http-address.mycluster.nn2</name><value>ip-10-16-135-195:9101</value></property> <property><name>dfs.ha.fencing.methods</name><value>shell(/bin/true)</value></property> <property><name>dfs.ha.automatic-failover.enabled</name><value>true</value></property> <property><name>dfs.client.failover.proxy.provider.mycluster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property>
where dfs.ha.fencing.methods could be replaced with a shell command to kill the name node, but leave it as it is now.
setup zookeeper quorum on the 3 selected nodes
On the node ip-10-235-3-184, run the following steps to setup zookeeper
$ wget http://apache.claz.org/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz $ tar zxvf zookeeper-3.4.6.tar.gz $ ln -s zookeeper-3.4.6 zookeeper $ mkdir -p /mnt/var/lib/zookeeper/zookeeper $ echo "1" > /mnt/var/lib/zookeeper/myid $ cd zookeeper $ cp zoo_sample.cfg zoo.cfg
Edit zoo.cfg and so that its content looks like the following.
$ cat ~/zookeeper/conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. dataDir=/mnt/var/lib/zookeeper # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients maxClientCnxns=60 server.1=ip-10-235-3-184:2888:3888 server.2=ip-10-16-135-195:2888:3888 server.3=ip-10-61-147-15:2888:3888
Do the same on node ip-10-16-135-195 and ip-10-61-147-15, the only difference is the myid.
On ip-10-16-135-195, run the following command
$ echo "2" > /mnt/var/lib/zookeeper/myid
On ip-10-61-147-15, set myid to "3" instead.
$ echo "3" > /mnt/var/lib/zookeeper/myid
start the zookeeper quorum on the 3 selected nodes
$ cd ~/zookeeper/bin/ $ ./zkServer.sh start
start the journal quorum on the selected 3 nodes
$ sbin/hadoop-daemon.sh start journalnode
initialize journal quorum
On name node ip-10-235-3-184, run the following command.
$ hdfs namenode -initializeSharedEdits -force
format zkfc service on the 3 zookeeper nodes
$ hdfs zkfc -formatZK -force
bootstrap the second namenode
On the second name node host ip-10-16-135-195, run the following command
$ hdfs namenode -bootstrapStandby -force
start zkfc service on the two name nodes
The ZKFailoverController (ZKFC) is a new component which is a ZooKeeper client which also monitors and manages the state of the Name Node. Each of the machines which runs a Name Node also runs a ZKFC.
On the first name node ip-10-235-3-184 and the second name node ip-10-16-135-195, run the following command
$ sbin/hadoop-daemon.sh start zkfc
start name nodes
$ sbin/hadoop-daemon.sh start namenode
start data nodes
$ sbin/hadoop-daemon.sh start datanode
After finish all the above steps, run the following commands to test if HDFS works fine and if the data are still there.
$ hdfs dfsadmin -report $ hdfs haadmin -getServiceState nn1 $ hdfs haadmin -getServiceState nn2
Configure Resource Manager HA
Remove all resource manager address related configuration from yarn-site.xml such as yarn.resourcemanager.webapp.address, yarn.resourcemanager.admin.address, yarn.resourcemanager.address, yarn.resourcemanager.resource-tracker.address, and yarn.resourcemanager.scheduler.address. Then add the following parameters to yarn-site.xml.
<property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>mycluster</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>ip-10-235-3-184</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>ip-10-16-135-195</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>ip-10-235-3-184:2181,ip-10-16-135-195:2181,ip-10-61-147-15:2181</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>ip-10-235-3-184:9026</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>ip-10-16-135-195:9026</value> </property> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <property> <name>yarn.client.failover-proxy-provider</name> <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value> </property>
After that, start resource manager on ip-10-235-3-184 and ip-10-16-135-195.
$ sbin/yarn-daemon.sh start resourcemanager
Start job history server, web proxy server, and all node managers so that they can pick up configuration changes in core-site.xml and yarn-site.xml.
We can use the following commands to get resource manager HA state.
$ yarn rmadmin -getServiceState rm1 active $ yarn rmadmin -getServiceState rm2 standby
Test Name Node HA and Resource Manager HA
Test the cluster by running teragen, terasort, and teravalidate
$ hadoop jar hadoop-examples.jar teragen 10000000000 /tera/input $ hadoop jar hadoop-examples.jar terasort /tera/input /tera/output $ hadoop jar hadoop-examples.jar teravalidate /tera/output /tera/report
Test name node failover
We can kill the active name node when the MR job is running to test the name node failover.
Test resource manager failover
We can kill the active resource manager when the MR job is running to test the resource manager failover.