Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API.
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API.
No comments:
Post a Comment