Friday, December 28, 2012

Use Compression with Mapreduce


Hadoop is intended for storing large data volumes, so compression becomes a mandatory requirement here. There are different compression formats available like gzip,Bzip,LZO etc. Of these Bzip(the latest) and LZO are splittable and in that Bzip offers a better compression but the decompression of the same is expensive. When we look at both space and time LZO is more advisable. Also LZO supports indexing which would again help you while using hive on your data. While running mapreduce with compression we need to know at least the following
1.       How to run map reduce on compressed data
2.       How to produce compressed output from mapreduce

Running Mapreduce on compressed data
                It is very straight forward, no need to implement any custom input format for the same. You can use any input formats with compression. The only step is to add the compression codec to the value in io.compression.codecs

Suppose if you are using LZO then your value would look something like
io.compression.codecs  =  org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzopCodec

Then configure and run your map reduce jobs as you do normally on uncompressed files. When map wants to process a file and if it is compressed it would check for the io.compression.codecs   and use a suitable codec from there to read the file.

Produce compressed data from map reduce
                It is again straight forward and you can achieve the same by setting the following parameters. (Using LZO here)
mapred.output.compress=true
mapred.output.compression.codec= com.hadoop.compression.lzo.LzopCodec

You get your output compressed in LZO. Again here also you can use the same with any normal output formats.

Index LZO files
                It is possible with just 2 lines of code as

//Run theLZO indexer on files in hdfs
LzoIndexer indexer = new LzoIndexer(fs.getConf());
indexer.index(filePath);

Compress Intermediate output (map output)
                Compressing intermediate output is also a good idea in map reduce. The map outputs have to be copied across nodes to reducers and if compressed it saves network and transfer time. Just specify the following configuration parameters as
mapred.compress.map.output=true
mapred.map.output.compression.codec= hadoop.compression.lzo.LzoCodec

Query Hbase tables using hive/ Mount an Hbase table into hive


You can use Hbase as the data store for your hive table. On hive table creation we need to specify a mapping for the same.  What all needs to be provided
1.       The hbase table name
2.       The mapping between hbase Column Family:Qualifier to hive Columns
If you are mapping a hbase column Family itself to a hive column then the data type of that hive column has to be Map. Also in the DDL the table has to me specified as External

Example of mapping a hbase table (employee_hbase) to a hive table employee_hive

CREATE EXTERNAL TABLE employee_hive(key INT, value1 STRING,value2 Map<STRING,STRING>)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val,cf2:")
TBLPROPERTIES("hbase.table.name" = "employee_hbase");

Enable Multiple threads in a mapper aka MultithreadedMapper


As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads.  (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.
Old API
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Or
jobConf.setMapRunnerClass(org.apache.hadoop.mapred.lib.MultithreadedMapRunner);

New API
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by
MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n
 


Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
                Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
                The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.

Tuesday, December 25, 2012

How to recover deleted files from hdfs/ Enable trash in hdfs

If you enable thrash in hdfs, when an rmr is issued the file will be still available in trash for some period. There by you can recover accidentally deleted ones. To enable hdfs thrash
set fs.trash.interval > 1

 
This specifies the time interval a file deleted would be available in trash. There is a property (fs.trash.checkpoint.interval) that specifies the checkpoint interval NN checks the trash dir at every intervals and deletes all files older than specified fs.trash.interval . ie say you have your
fs.trash.interval as 60 mins and fs.trash.checkpoint.interval as 30 mins, then in every 30 mins a check is performed and deletes all files that are more than 60 mins old.

fs.trash.checkpoint.interval should be equal to or less than fs.trash.interval

The value of fs.trash.interval  is specified in minutes.

fs.trash.interval should be enabled in client node as well as Name Node. Name Node it should be present for check pointing purposes. Based the value in client node it is decided whether to remove a file completely from hdfs or thrash it on an rmr issued from client.

The trash dir by default is /user/X/.Trash

ibjars not working in custom mapreduce code, How to debug

Mostly application developers bump into this issue. They ship their custom jars to map reduce job but when the classes in those are referred by code it throws a Class not found exception.

For -libjars to work your main class should satisfy the following two conditions.
 
1) Main Class should implement the Tool interface


 //wrong usage - Tool Interface not implemented
public class WordCount extends Configured {

//right usage
public class WordCount extends Configured implements Tool {
 
2) Main Class should get the existing configuration using getConf() method rather than creating anew configuration instance.


//wrong usage - creating anew instance of Conf 
public int run(String[] args) throws Exception {
   Configuration conf = new Configuration();
 
//right usage 
 public int run(String[] args) throws Exception {
    Configuration conf = getConf();

Is compression codecs required on client nodes?

Some times even after having compression codecs available across all nodes in cluster we see some of our jobs giving class not found for compression codecs.


Even though compression /decompression processes are done by task trackers. In certain cases the compression codecs are required on the client nodes. 

Some scenarios are
 Total Order Partitioner
          Before triggering the mapreduce job, the job need to have an understanding on the ranges of key. Only then it can decide on which range of keys should go into which reducer. We need this value before map tasks starts, for that initially the client makes a random across input data sample (seek could be like read first 20 mb skip next 200 mb read next 20 mb etc). 

 Hive and Pig
For better optimization of jobs, uniform distribution of data across reducers and determining number or reducers etc hive and pig actually does a quick seek on input data samples.

In both these cases, since a sample of Input data is actually read on client side before the MR tasks, if data is compressed the compression codec needs to be available on the client node as well.

Hive Hbase integration/ Hive HbaseHandler : Common issues and resolution


It is common that when we try out hive hbase integration it leads to lots of unexpected errors even though hive and hbase are running individually without any issues. Some common issues are

1) jars not available
The following jars should be available on hive auxpath
  1. usr/lib/hive/lib/hive-hbase-handler-0.7.1-cdh3u2.jar
  2. /usr/lib/hive/lib/hbase-0.90.4-cdh3u2.jar
  3. /usr/lib/hive/lib/zookeeper-3.3.1.jar  
These jars vary with the hbase , hive and zookeeper version running on your cluster

2)zookeeper quorum not available for hive client
Should specify the zoo keeper server host names so that the leader hbase master server can be chosen for the hbase connection

hbase.zookeeper.quorum=zk1,zk2,zk3

Where zk1,zk2,zk3 should be the actual hostnames of the ZooKeeper servers


These values can be set either on your hive session or permanently on your hive configuration files

1) Setting in hive client session
$ hive -auxpath /usr/lib/hive/lib/hive-hbase-handler-0.7.1-cdh3u2.jar:/usr/lib/hive/lib/hbase-0.90.4-cdh3u2.jar:/usr/lib/hive/lib/zookeeper-3.3.1.jar -hiveconf hbase.zookeeper.quorum=zk1,zk2,zk3

2) Setting in hive-site.xml
<property>
<name>hive.aux.jars.path</name>
<value>file:///usr/lib/hive/lib/hive-hbase-handler-0.7.1-cdh3u2.jar,file:///usr/lib/hive/lib/hbase-0.90.4-cdh3u2.jar,file:///usr/lib/hive/lib/zookeeper-3.3.1.jar,file:///usr/lib/hive/lib/hive-contrib-0.7.1-cdh3u2.jar</value>
</property>


 <property>
<name>
hbase.zookeeper.quorum</name>
<value>
zk1,zk2,zk3</value>
</property>


Still hive hbase integration not working 

Error thrown : Master not running

Check whether the HbaseMaster is really down. If it is fine there could be other possibilities , a few common ones being

  • Firstly you need to check whether hbase.zookeeper.quorum is correctly set, it shouldn’t be localhost.
  • If multiple hbase clusters share the same zookeeper quorum then the znode parent value will be different for each. If that is the case, then set ‘zookeeper.znode.parent’ also has to be set in hive configuration, to the correct value from hbase-site.xml.

Monday, December 24, 2012

Optimizing Joins in hive/Sorting Java Heap issues with hive joins


In hadoop we tent to use hive extensively since it is SQL like language and easier in framing our jobs with stored structured data. (Even Pig is great but still needs a little time to get comfortable with Pig Latin). But as beginners we often get struck with hive joins in large data sets. It is a common scenario running into out of memory/java heap space errors on joins with huge hive tables. We can avoid these bottlenecks to a greater extent utilizing a few smarter options available with hive. Let us look into a few of them.

1.       Enable map joins
It is a pretty good approach to enable map joins in hive when you are trying to do a join with multiple tables and if one or more of them has a smaller data volume. With this enabled the smaller tables would be distributed on the distributed cache as a hash table by a local map reduce task before the actual map reduce job. This could save considerable time as it turns to be map side join compared to running a common map reduce side join (normal hive join). You need to set the following at hive CLI before running the join Query
               set hive.auto.convert.join = true;

The point to be noted here is that, hive is intelligent enough with map side joins and if the data volume is larger not to fit into map side joins it executes the backup task, ie the common  join(full map reduce execution) to accomplish the job. So when you are taking performance into consideration the time to check on the executablity of map  join is an overhead, so if you are sure the data in the tables that you try to join is always huge then better not enabling the same for your job.
We were mentioning the term ‘small tables’ a lot here. But how small this table has to be? By default the small table size is 25 Mb. So if the table is larger than 25 Mb then the hive common join would be triggered. However 25Mb is conservative and you can modify the same to a desirable value by setting the following configuration variable.
                set hive.smalltable.filesize = 40000000;

2.       Exploiting EQUI Join support in hive
Hive supports only Equi joins and we need to exploit the same in our hive query to get rid of OOM errors as well another common scenario of hive queries running infinitely. It is relatively straight forward, if your join query has a few where clauses with equality then include them inside the ON clause in your joins. It’d considerably reduce the number of records chosen to join and hence lesser number of records in sort phase.

                For eg: let us consider a query like this
Select Table1.Column1, Table2.Column2 FROM Table1 JOIN Table2 ON (Table1.Column5 = Table2.Column7 AND Table1.Column9=Table2.Column3) WHERE Table1.Column1 = ‘1024’ AND Table2.Column2 > 5;

This Query has an equality expression in the where clause involving one of the tables in join, we can optimize our hive query by including the equality filter condition as part of join as

Select Table1.Column1, Table2.Column2 FROM Table1 JOIN Table2 ON (Table1.Column5 = Table2.Column7 AND Table1.Column9=Table2.Column3 AND Table1.Column1 = ‘1024’) WHERE Table2.Column2 > 5;

The difference it creates in execution is that in first query after the join the filter condition is applied which means there would be more records involved in join but in the second query the filter condition is done before/on joining hence less records in join.

3.       Increase the heap Size
Definitely this has to be one of the options if your hive query is already optimized and satisfies the first two checks and still the execution halts due to heap size issues. You can increase the heap size for the map reduce child tasks by setting the property ‘mapred.child.java.opts’ to a higher value.  Like for 1GB set it as
mapred.child.java.opts =  -Xmx1024m


Definitely there are many other options to deal with these issues like working on io.sort.mb deciding on the maximum number of mappers/reducers etc. Left up to your choice to google on and hit the bulls eye based on your use case/hive query


Performance tuning of hive queries



Hive performance optimization is a larger topic on its own and is very
 specific to the queries you are using. Infact each query in a query file needs separate performance tuning to get the most robust results.

I'll try to list a few approaches in general used for performance optimization
Limit the data flow down the queries
When you are on a hive query the volume of data that flows each level down is the factor that decides performance. So if you are executing a script that contains a sequence of hive QL, make sure that the data filtration happens on the first few stages rather than bringing unwanted data to bottom. This will give you significant performance numbers as the queries down the lane will have very less data to crunch on.

This is a common bottle neck when some existing SQL jobs are ported to hive, we just try to execute the same sequence of SQL steps in hive as well which becomes a bottle neck on the performance. Understand the requirement or the existing SQL script and design your hive job considering data flow
Use hive merge files
Hive queries are parsed into map only and map reduce job. In a hive script there will lots of hive queries. Assume one of your queries is parsed to a mapreduce job and the output files from the job are very small, say 10 mb. In such a case the subsequent query that consumes this data may generate more number of map tasks and would be inefficient. If you have more jobs on the same data set then all the jobs will get inefficient. In such scenarios if you enable merge files in hive, the first query would run a merge job at the end there by merging small files into  larger ones. This is controlled
using the following parameters

hive.merge.mapredfiles=true
hive.merge.mapfiles=true (true by default in hive)

For more control over merge files you can tweak these properties as well
hive.merge.size.per.task (the max final size of a file after the merge task)
hive.merge.smallfiles.avgsize (the merge job is triggered only if the average output filesizes is less than the specified value)

The default values for the above properties are
hive.merge.size.per.task=256000000
hive.merge.smallfiles.avgsize=16000000

When you enable merge an extra map only job is triggered, whether this job gets you anoptimization or an over head is totally dependent on your use case or the queries.

Join Optimizations
Joins are very expensive.Avoid it if possible. If it is required try to use join optimizations as map joins, bucketed map joins etc


There is still more left on hive query performance optimization, take this post as the baby step. More tobe added on to this post and will be addded soon . :)


Wednesday, December 19, 2012

Installing Apache Thrift in Ubuntu


Apache Thrift is a RPC framework for scalable cross-language services development which in simpler words allows easy exchange of data such as variables, objects between applications written in different languages. Thrift offers seamless cross language serialization between languages like C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk and OCaml, through Code generation. The advantage of Thrift is that it’s faster than using SOAP since it’s using a binary protocol.

For better understanding of Thrift's architecture which consists of Transports, Protocols and Processors I recommend you to go through the paper I found [1].
Thrift was initially developed at Facebook, and now open sourced as an Apache project. Since I found Thrift is less documented, in this post I'll try to walk through the installation steps in Ubuntu.

I have tested this on Ubuntu 11.11 and 12.04

To install Thrift

Step 1.
First install
sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev

Step 2.
Then Download tar.gz archive from the download site, decompress archive in your home directory:
tar -xvzf thrift-0.8.0.tar.gz

Step 3.
Go to the installation root directory  and run
$ ./configure

Step 4.
A the end of the output you will find something like this

thrift 0.8.0
Building code generators ..... :
Building C++ Library ......... : yes
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : no
Building Erlang Library ...... : no
Building Go Library .......... : no
Building TZlibTransport ...... : yes
Building TNonblockingServer .. : yes
Using javac .................. : javac
Using java ................... : java
Using ant .................... : /usr/bin/ant
Using Python ................. : /usr/bin/python

Check whether the libraries for your development environment have been code generated. If something is missing skim through the output of configure to find the missing component and install that, and then again run step 3.

Step 5.
Finally from the same directory run
$ make

Step 6.
Now run
$ sudo make install

Now your done

To check whether Thrift is working, run;
$ thrift -version

[1] http://thrift.apache.org/static/thrift-20070401.pdf

Tuesday, December 18, 2012

Install Ubuntu Tweak in Ubuntu 12.10


Use Ubuntu Tweak to customize Ubuntu 12.10


1. Open a terminal window.
2. Type in the following commands then hit Enter after each.
sudo add-apt-repository ppa:tualatrix/ppa
sudo apt-get update
sudo apt-get install ubuntu-tweak