Sometimes, we need to add HA to a running cluster. Take a hadoop 2.4.1 cluster as an example, I will list the steps to setup journal quorum based name node HA and resource manager HA.

Backup the HDFS Edit Files and fsimage Files


Before we do the upgrade, we need to backup the HDFS edit files and fsimage files. It is possible to use the following curl command to download the fsimage file from a running cluster, where 54.226.111.204 is the name node IP.

$ curl -o fsimage.test "http://54.226.111.204:9101/imagetransfer?getimage=1&txid=latest"

But be aware that the fsimage file only contains the metadata when the name node starts since Name Node merges fsimage and edits files only during start up as stated in Hadoop document as follows.

The NameNode stores modifications to the file system as a log appended to a native file system file, edits. When a NameNode starts up, it reads HDFS state from an image file, fsimage, and then applies edits from the edits log file. It then writes new HDFS state to the fsimage and starts normal operation with an empty edits file. Since NameNode merges fsimage and edits files only during start up, the edits log file could get very large over time on a busy cluster. Another side effect of a larger edits file is that next restart of NameNode takes longer.

To be safe, we need to shutdown all the hadoop daemons and then to backup the whole name node directory including the edit files and the fsimage files.

From my observation, the standby name node cannot process any read/write operation, if the hdfs client still points to the old name node and it now becomes a standby one, you will see tons of warning messages.

2014-09-25 06:18:24,247 INFO org.apache.hadoop.ipc.Server (IPC Server handler 63 on 9000): IPC Server handler 63 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations from 10.61.147.15:58693 Call#1 Retry#0: error: org.apache.hadoop.ipc.StandbyException: Operation category READ is not supported in state standby

After that, tar and zip one of the meta data folder, for example, assume it is /mnt/var/lib/hadoop/dfs-name, and upload the zipped file to a specific place to back up the files.

$ tree /mnt/var/lib/hadoop/dfs-name/
/mnt/var/lib/hadoop/dfs-name/
├── current
│   ├── edits_0000000000000000001-0000000000000025667
│   ├── edits_0000000000000025668-0000000000000027061
│   ├── edits_0000000000000027062-0000000000000027828
......
│   ├── edits_inprogress_0000000000000030491
│   ├── fsimage_0000000000000064526
│   ├── fsimage_0000000000000064526.md5
│   ├── fsimage_0000000000000065939
│   ├── fsimage_0000000000000065939.md5
│   ├── seen_txid
│   └── VERSION
└── in_use.lock

1 directory, 50 files

One sample backup script is listed here.

#!/usr/bin/env bash

echo "force the name node to enter safemode"
hdfs dfsadmin -safemode enter

echo "go to one of the meta file folders"
cd /mnt/var/lib/hadoop/

echo "pack the meta files"
tar zcvf dfs-name.tar.gz dfs-name

echo "upload the backup file to s3"
hadoop fs -copyFromLocal dfs-name.tar.gz s3://my_s3_bucket

echo "remove the backup file"
rm dfs-name.tar.gz

echo "leave the safemode"
hdfs dfsadmin -safemode leave

Configure Name Node HA

We need to select 3 nodes in the cluster for journal quorum, zookeeper quorum, and two name nodes to configure name node HA. Here, I pick the original name node ip-10-235-3-184 and two other nodes ip-10-16-135-195 and ip-10-61-147-15 to configure the name node HA.

Update hadoop configurations on the 3 selected nodes

Remove the following line from core-site.xml

<property><name>fs.default.name</name><value>hdfs://10.235.3.184:9000</value></property>

and add the following line

<property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property>

After that, add the following lines to hdfs-site.xml

<property><name>dfs.nameservices</name><value>mycluster</value></property>
<property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property>
<property><name>ha.zookeeper.quorum</name><value>ip-10-235-3-184:2181,ip-10-16-135-195:2181,ip-10-61-147-15:2181</value></property>
<property><name>dfs.journalnode.edits.dir</name><value>/mnt/var/lib/hadoop/journal/local/data</value></property>
<property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://ip-10-235-3-184:8485;ip-10-16-135-195:8485;ip-10-61-147-15:8485/mycluster</value></property>
<property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>ip-10-235-3-184:9000</value></property>
<property><name>dfs.namenode.http-address.mycluster.nn1</name><value>ip-10-235-3-184:9101</value></property>
<property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>ip-10-61-147-15:9000</value></property>
<property><name>dfs.namenode.http-address.mycluster.nn2</name><value>ip-10-16-135-195:9101</value></property>
<property><name>dfs.ha.fencing.methods</name><value>shell(/bin/true)</value></property>
<property><name>dfs.ha.automatic-failover.enabled</name><value>true</value></property>
<property><name>dfs.client.failover.proxy.provider.mycluster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property>

where dfs.ha.fencing.methods could be replaced with a shell command to kill the name node, but leave it as it is now.

setup zookeeper quorum on the 3 selected nodes

On the node ip-10-235-3-184, run the following steps to setup zookeeper

$ wget http://apache.claz.org/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
$ tar zxvf zookeeper-3.4.6.tar.gz
$ ln -s zookeeper-3.4.6 zookeeper
$ mkdir -p /mnt/var/lib/zookeeper/zookeeper
$ echo "1" > /mnt/var/lib/zookeeper/myid
$ cd zookeeper
$ cp zoo_sample.cfg zoo.cfg

Edit zoo.cfg and so that its content looks like the following.

$ cat ~/zookeeper/conf/zoo.cfg 
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/mnt/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
server.1=ip-10-235-3-184:2888:3888
server.2=ip-10-16-135-195:2888:3888
server.3=ip-10-61-147-15:2888:3888

Do the same on node ip-10-16-135-195 and ip-10-61-147-15, the only difference is the myid.

On ip-10-16-135-195, run the following command

$ echo "2" > /mnt/var/lib/zookeeper/myid

On ip-10-61-147-15, set myid to "3" instead.

$ echo "3" > /mnt/var/lib/zookeeper/myid

start the zookeeper quorum on the 3 selected nodes

$ cd ~/zookeeper/bin/
$ ./zkServer.sh start

start the journal quorum on the selected 3 nodes

$ sbin/hadoop-daemon.sh start journalnode

initialize journal quorum

On name node ip-10-235-3-184, run the following command.

$ hdfs namenode -initializeSharedEdits  -force

format zkfc service on the 3 zookeeper nodes

$ hdfs zkfc -formatZK -force

bootstrap the second namenode

On the second name node host ip-10-16-135-195, run the following command

$ hdfs namenode -bootstrapStandby -force

start zkfc service on the two name nodes

The ZKFailoverController (ZKFC) is a new component which is a ZooKeeper client which also monitors and manages the state of the Name Node. Each of the machines which runs a Name Node also runs a ZKFC.

On the first name node ip-10-235-3-184 and the second name node ip-10-16-135-195, run the following command

$ sbin/hadoop-daemon.sh start zkfc

start name nodes

$ sbin/hadoop-daemon.sh start namenode

start data nodes

$ sbin/hadoop-daemon.sh start datanode

After finish all the above steps, run the following commands to test if HDFS works fine and if the data are still there.

$ hdfs dfsadmin -report

$ hdfs haadmin -getServiceState nn1

$ hdfs haadmin -getServiceState nn2

Configure Resource Manager HA


Remove all resource manager address related configuration from yarn-site.xml such as yarn.resourcemanager.webapp.address, yarn.resourcemanager.admin.address, yarn.resourcemanager.address, yarn.resourcemanager.resource-tracker.address, and yarn.resourcemanager.scheduler.address. Then add the following parameters to yarn-site.xml.

<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
 </property>
 <property>
   <name>yarn.resourcemanager.cluster-id</name>
   <value>mycluster</value>
 </property>
 <property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>rm1,rm2</value>
 </property>
 <property>
   <name>yarn.resourcemanager.hostname.rm1</name>
   <value>ip-10-235-3-184</value>
 </property>
 <property>
   <name>yarn.resourcemanager.hostname.rm2</name>
   <value>ip-10-16-135-195</value>
 </property>
 <property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>ip-10-235-3-184:2181,ip-10-16-135-195:2181,ip-10-61-147-15:2181</value>
 </property>
 <property> 
    <name>yarn.resourcemanager.webapp.address.rm1</name> 
    <value>ip-10-235-3-184:9026</value> 
 </property> 
 
 <property> 
    <name>yarn.resourcemanager.webapp.address.rm2</name> 
    <value>ip-10-16-135-195:9026</value> 
 </property> 
 
 <property> 
    <name>yarn.resourcemanager.recovery.enabled</name> 
    <value>true</value> 
 </property> 
 
 <property> 
    <name>yarn.resourcemanager.store.class</name> 
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> 
 </property> 
 
 <property> 
   <name>yarn.client.failover-proxy-provider</name> 
   <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value> 
 </property>

After that, start resource manager on ip-10-235-3-184 and ip-10-16-135-195.

$ sbin/yarn-daemon.sh start resourcemanager

Start job history server, web proxy server, and all node managers so that they can pick up configuration changes in core-site.xml and yarn-site.xml.

We can use the following commands to get resource manager HA state.

$ yarn rmadmin -getServiceState rm1
active

$  yarn rmadmin -getServiceState rm2
standby

Test Name Node HA and Resource Manager HA


Test the cluster by running teragen, terasort, and teravalidate

$ hadoop jar hadoop-examples.jar teragen 10000000000 /tera/input
$ hadoop jar hadoop-examples.jar terasort /tera/input /tera/output
$ hadoop jar hadoop-examples.jar teravalidate /tera/output /tera/report

Test name node failover

We can kill the active name node when the MR job is running to test the name node failover.

Test resource manager failover
We can kill the active resource manager when the MR job is running to test the resource manager failover.
أحدث أقدم