Wednesday, January 23, 2013

Hadoop Performance tuning (Hadoop-Hive) Part 2



[Note: This post is second part of Hadoop performance tuning, if you directly reached this page, please click here for part 1.]

I am testing these parameters with Hadoop and Hive framework from sql based queries. For checking performance improvement with configuration parameters, I use sample data of 100 million records and running some complex queries in Hive interface in top of Hadoop. In this part 2 we will see few more Hadoop configuration parameter to get maximum performance improvement in Hadoop cluster. 

Map Output compression ( mapred.compress.map.output )
By default this value set to false, its recommend to set this parameter to true for cluster with large amount of input data to be processed.  Because of compression data transfer between nodes are fast. Map output will not directly move to reducer, intermediately it will write to disk. So this setting helps to save disk space and fast disk read/write. And it’s not recommended to set this parameter to true for small amount of input data to be processed, because it will increase the processing time for compressing and decompressing data. But for Big data compressing and decompression time is considerably small when compare to time its saves in transferring and disk read/write. 

Once we set above configuration parameter to true, other dependent parameter will be active such as setting compression technique (codec) and compression type.  

Compression method or technique orcodec (mapred.map.output.compression.codec )
Default value for this parameter is org.apache.hadoop.io.compress.DefaultCodec. Other available codec are org.apache.hadoop.io.compress.GzipCodec. DefaultCodec will take more time but more compression. In LZO method it will take less time for compression amount of compression is less. Our own codec also can be added. Add codec or compression library which is suitable (best) for your input data type. 

mapred.map.output.compression.type parameter help to identify in which basis data should be compressed. User can set either RECORD or BLOCK. Record type is default type in which each individual value is compressed, means it will compress whole data block as it is. Block type is recommended one, in which data compressed based on data block key-value pairs, so it helps for sorting data in reducer side. In Cloudera Hadoop, default type is set to Block for better performance.

Three more configuration parameters are there 

1. mapred.output.compress
2. mapred.output.compression.type 
3. mapred.output.compression.codec

Same above rules apply here, but this parameter meant for MapReduce job output, first three parameters specify compressed output for map output alone. These three configuration parameter specify for all job output which should be compressed or not and in which type and codec.

Above suggestions are observed with Hadoop cluster with Hive querying, please leave a comment and recommend this post by clicking  Facebook ‘Like’ button and ‘+1’ at bottom of this page.

Reason for Cloud computing popularity and rapid development


Nowadays Cloud computing users is growing in exponential. Because, it has lot of features like Pooled resources, Elasticity, instance startup, On-Demand Computing, self service model, location independence, Multi-tenancy, Reliability, Easy Maintenance and Pay per use Model. And one more main reason is, cloud computing is not a single technology, it’s a group of technology names as cloud computing. Or we can say, Cloud service providers bring lot of new features from other technology and introducing more and more new features along with cloud to beat competitors.


What are all technologies clubbed with Cloud Computing or Underling technologies in Cloud Computing?

Client Server model - Distributed Environment, client request for service, server process the request and send back the result, computing happen in server.


Grid Computing - Parallel computing along with client server model, people use to say it’s a super computer, based on the capacity of cluster in Grid, its’ just a group of processor which run parallel at time, number of processor will be more then thousand. This type of model used to process complex form of tasks.

Mainframe computer powerful computer used to process huge amount of data, large companies which do millions of transaction per day will usually use this type of system.

Utility Computing - Model in which provider charges to customer only for what they consumed. Similar like our utility bill (i.e. Electricity, phone bill)

Autonomy Computing - Model which is self managing capability in distributed environment. It has features like self configuring, self healing, self optimization, and self protection with help of Artificial Intelligence.  

Real Time example with used all this technologies with Cloud:

Let take Amazon or Ebay type of website, which run under client server model, as it used across all over the world, surely we need a powerful system like Main frame or Grid type of computing system to process all customer request. But there is a situation like some festival season (Charismas Eve). Surely on that time, traffic over the network is high because people buy lot of gifts than normal. Instead of buying new servers to manages this traffic for few days. Major companies can use cloud, in which all resources like network bandwidth, processor, memory, etc, are auto scalable with the help of Autonomy Computing and uses Utility Computing model so companies can save his capital expenditure.  As well as help to give good service to customers. 

Hadoop with Hive


Nowadays, there are lots of Hadoop emerging. Indeed, by “Lots of Hadoop”, I mean companies releasing their own versions of Hadoop (e.g. Cloudera) by building a layer over the original Apache Hadoop distribution. We can also call these “customized” versions of Apache Hadoop. But when we think about the core part, it remains the same across different Hadoop flavors. Apache Software Foundation (ASF) focuses on improving Hadoop by bringing many smaller sub-projects under it to facilitate open source tools development around Hadoop. Hive happens to be one of Hadoop’s more prominent child projects.
Hive is a data warehouse infrastructure, initially developed by Facebook. Hadoop with Hive combination gives us advantages of Distributed File System, Map-Reduce and SQL. As we know, to process huge amounts of data in Hadoop for each and every process/operation, we have to write new Map-Reduce program (job). For users with limited number of operations or sequences of same operation, this task will be an easy one. But for those whose requirements are a bit more prone to change, the challenge is they have to write new Map-Reduce program for every new requirement. Unfortunately, this is the only way to deal with unstructured data.
But for structured data, like logging (log4j) files, relational type data, and other similar, more predictable sets, the data can be stored in table-like structures. This is the area where Apache Hive really shines. Hive is a layer running on top of Hadoop that helps process the data in Hadoop by using SQL-like queries written in Hive Query Language (HQL). While loading data in HDFS through Hive as table, it also stores metadata of input, which describes the structure of input data. Note that Hive is required to be installed on the Hadoop master node. Hive converts an input query into a Map-Reduce job and submits it to Hadoop, making it easy for users to analyze and process data.

Hive Prerequisites

  • Hadoop 0.20 and above
  • Java 1.6 and above
  • MySQL or Derby lightweight database in master node to store only Hive metadata
Hadoop with Hive Diagram

Advantages of Hive

  • Supports rich data types like List, Map, and Struct, apart from basic data types.
  • Provides Web UI and Command Line Interface UI that are incorporated for querying data. This provides helpful tools for developers and learners for testing and debuging their queries.
  • Thrift server that comes with Hive helps with JDBC and ODBC connections, so any application can interact with Hive to Hadoop as a backend database. Thrift takes care of language conversion, which allows ANY type of language program to interact with Hadoop.
  • Even for complex structured input data, we can write our own DeSer (serializers and deserializers) programs for parsing input data, storing their table structure in metadata repository, and loading data on Hadoop File System (HDFS).
  • Supports queries with SQL filters, Joins, Group By, Order By, Inner Table, Functions, and other SQL-like operators. Using HQL we can also redirect query output to a new table. Along with all SQL features, we can also attach our own functions and Map-Reduce programs as the part of HQL query.
  • Partition and Bucket: partitioning helps split data into different chunks based on input value range, which allows to skip unwanted data while executing queries. Bucket split data is based on a hash function. Both help to improve the performance of querying.
  • Optimizers are being developed by Apache for Hive for better performance. We can improve our Hadoop and Hive performance by tuning few configuration parameters based on our application requirements. To learn more, read my recent article on Hadoop and Hive Performance Tuning.
  • Hive is used by major companies like Facebook, Yahoo, and Amazon. Hadoop and Hive play a major role in the proliferation of Cloud Computing. Amazon provides S3 (Simple Storage Service) and Elastic MapReduce as a service in cloud environment, which is a Cloud server pre-installed with Hadoop and Hive. It allows us to load our data in Hadoop (Elastic MapReduce) and execute queries on it with the help of Hive. Amazon Elastic MapReduce is a successful product which uses Hadoop and Hive jointly. Click here to learn more about how this technology works.
With more and more Hadoop distributions appearing in the “wild”, it’s clear that this project isn’t going anywhere anytime soon. If anything, it will only gain momentum as more and more companies switch to Hadoop to handle their large data repositories. Hive is a relatively mature Hadoop sub-project companion that facilitates easy data analysis, ad-hoc queries, and manipulation of large datasets stored in Hadoop. These two are a “Match Made in Heaven”!
Above suggestions are observed with Hadoop cluster with Hive querying, please leave a comment and recommend this post by clicking  Facebook ‘Like’ button and ‘+1’ at bottom of this page.

Hadoop Performance Tuning (Hadoop-Hive)


Hadoop Cluster performance tuning is little hectic, because hadoop framework uses all type of resource for processing and analyzing data. So tuning its parameter for good performance is not static one. Parameter values should be change based on clusters following items for better performance:
  • ·         Operating System
  • ·         Processor and its number of cores
  • ·         Memory (RAM)
  • ·         Number of nodes in cluster
  • ·         Storage capacity of each node
  • ·         Network bandwidth
  • ·         Amount of input data
  • ·         Number of jobs in business logic

Recommended OS for hadoop clusters is Linux, because windows and other GUI based OS runs lot of GUI (Graphical user interface) processes and will occupy most of the memory.

Storage capacity of each node should have at-least 5GB extra after storing distributed HDFS input data. For Example if input data in 1 TB and with 1000 node cluster means, (1024GB x 3(replication factor))/1000 nodes = approx 3GB of distributed data in each node, so it is recommended to have at-least 8GB of storage in each node. Because each data node writes log and need some space for swapping memory.

Network bandwidth is recommended to have at-least 100 Mbps, as well known while processing and loading data into HDFS, Hadoop moves lot of data over network. Lower bandwidth channel also degrade the performance of hadoop cluster.

Number of nodes requires for cluster is depends on amount of data to be processed and capacity of each node. For example node with 2GB Memory and 2 core processor can process 1GB of data in average time. It can also process 2 data block (of 256MB 0r 512MB) simultaneously. For Example:  To process 5TB of data, it is recommended to have 1000 nodes with 4-to-8 Core processor and 8-to-10 GB of memory in each node to produce result in few minutes.



Hadoop Parameters:

Data block size (Chunk size): 
        dfs.block.size parameter will be in hdfs-site.xml file, parameter value is mentioned in number of bytes. Block size should be chosen completely based on each node memory capacity. If memory is less then set smaller block size. Because TaskTracker, bring whole block of data to memory while processing. So for 512MB RAM, it is advised to set block size as 64MB or 128MB. If it is dual core processor then TaskTracker can process 2 block of data at same time, so two data block will be bring to memory while processing, so it should be planned according to that, for this have to set concurrent tasktracker parameter also.

Number of Maps and Reducer:
           mapred.reduce.tasks & mapred.map.tasks parameter will be in mapred-site.xml file. By default, number of maps will be equal to number of data block. For example, if input data is 2GB and block size is 256MB means, while processing 8 Maps will run. It won’t bother about memory capacity and number of processor. So we need to tune this parameter to number of nodes*number of cores in each node.

Number of Maps = Total number of processor core available in cluster.

As per above example it runs 8 Maps, if that cluster have only 4 processor core, then multiple thread will start running and keep swapping the memory data, which will degrade the performance of hadoop cluster. In same way set number of reducer to number of core in cluster. After mapping job is over, most of nodes go idle and few nodes working for reducer to complete, to make reducer job to complete fast, set its value to number of nodes or number of core processor.

Logging Level:
            HADOOP_ROOT_LOGGER = ERROR set this value in hadoop script file. By default its set to INFO mode, in information mode, hadoop will log all information about including all event, jobs, tasks completed, IO info, warning and error. It won’t increase huge performance improvement, but it will help to reduce number of log file I/Os and give small improvement in performance.

Above suggestions are observed with Hadoop cluster with Hive querying, please leave a comment and recommend this post by clicking  Facebook ‘Like’ button and ‘+1’ at bottom of this page.

Big Data with Cloud Computing

What is Big Data?

Big Data usually refer to processing/analysing huge amount of data or data set(terabyte, petabyte...etc of data) which take long time to process in RDBMS type of databases. Big Data projects uses lot of technologies and framework to process data. First Google introduced MapReduce framework in 2004 and present day also google uses MapReduce framework to index whole WWW for google search engine. Few other frameworks used for Big Data are massively parallel processing (MPP) databases, data-mining grids, Apache Hadoop Framework etc,.

How cloud computing related with Big Data, that a big question?
For this, we just need to know how this MapReduce works.

Example let consider a scenario that, you have two table with 1TB of data (or) you can say 1Billion record (1000 Million) in each table. Running time for a querying these two tables with complex join condition will take around 30 minutes(approx), might vary depends on your database server capability. MapReduce framework have a strategy to handle this situation. Strategy is simple, big task is split-out and given to multiple people, so task will be done soon. 


MapReduce has two functions:

Map - Input data is partitioned and map to multiple(all) node in cluster. Once query is given, each node process and send its respective results.

Reduce - Hereafter Reducer combine all node result and give the final combined result.

As per above scenario, and along with 1000 node(worker) cluster. 1 TB data is partitioned into 256, 512 or 1024MB data blocks and mapped to all nodes in cluster by mapper while loading data. Once operation is initiated, each node process the data and send back the result to Reducer. Reducer combine the result and return the result. Here 1000 node is pretty enough to process 1 TB data, resulting time also will be reduced to almost 500 times roughly. 

But, Why should we use Cloud Computing for Big data?

The main reason is,
  •  Its difficult to establish and maintain 1000 nodes to process data for minimum usage of time.
  •  Chances are there for data size will increased to 100 TB, 200 TB and so on, Its difficult to establish required number of node on-demand.

What is Virtualization?


Virtualization means creating virtual form of hardware and/or software resources. Virtualization of server means partitioning physical server into several virtual servers, or machines. In which each virtual machines are isolated, so it can interact with other devices, applications, user and data independently. So it helps to install/run different operating systems in virtual machines although it runs under same physical server. Since it has the isolation feature, if one virtual machine crashes it will not affect the another virtual machine.
 Virtualization is not only about partitioning resources, it can also combine group of physical server into single server. Main concept of virtualization is utilizing the resource usefully by either partitioning or combining.

Why virtualization?
Take an example of data center, most of the machines utilize only 10-15% of its total capacity most of the time which results in wastage of electricity and maintenance cost.To make the optimal utilisation of the remaining capacities we can use Virtualization concepts which also helps to avoid installation of platform/application specific data centers.

How virtualization can be done? Who will take care of isolation of virtual machine (or) server?
Hypervisor is also known as a virtualization manager, which adds a new layer between the hardware and the operating system (see above picture). Hypervisor software separates the operating system and applications from the hardware resources. And it also controls the amount of access that each virtual machine have with hardware resources, such as processor, memory and disk input/output.

What are all the types of virtualization?
Nowadays lot of new types are evolving in virtualization field, we see few of its types
Storage virtualization -  Multiple storage resources in a network are combined into a single storage device for centralized access.
Network virtualization – Network bandwidth is split into independent channels and assigned to a particular server or device.
Application virtualization – Application can be separated from operating system for relocating.
Desktop virtualization – It is similar to server virtualization, but we create number of virtual machines which user can access remotely and use like personal computer. It helps in easy upgradation and maintenance of all desktops

Example: Consider Server with 8GB memory, 1TB storage and 20 core processor. Using virtualization this resources can be split in 8, 10, even 20 virtual machine. There is no condition like each virtual machine which come under one server should have equal amount resources. One virtual machine can have 1GB memory, 100GB storage and 2 core processor, whereas the other one can have 4GB memory, 200GB storage and 5 core processor, depending on the requirement.

List of Vendors offering virtualization software:
ü VMWare
ü Citrix
ü Microsoft Hyper-V
ü IBM PowerVM
ü RedHat Xen
ü Oracle VM Manager, etc.

We will see, what relation between virtualization and cloud computing in upcoming post.

What is iCloud?


From the series of Apple Inc products and services like iPhone, iPad, iPod, iOS, iTunes, iAd, iBookStore here come up with the new one iCloud, free cloud service for his customers.




Advantages of iCloud:
Nowadays it’s hectic to keep all your data updated across all your devices. iCloud helps to keep your data updated automatically to all your devices. This can be done by wirelessly either in Wi-Fi or 3G (based on user configuration). So users no need to worry about plug-in or transferring data between devices like iPad, iPhone and iPod. Changes made to your data in one device will automatically update to your other apple devices. For example photo which taken in your iPhone will be pushed to your PC automatically.
iCloud provides free storage space about 5GB to store user data and synchronize data between devices between our other devices like iPhone, iPad, iPod and Mac or Windows PC. And also it allows us to configure up to 10 devices for each user. Moreover space required to store the  music, books, apps purchased from apple are not included in this 5GB, hence user will get dedicated 5GB storage to store their personal Mail, Documents, and Backup data. In the case of user need more storage space for their use, and then they can extend their storage space by paying extra price.
The various types of data we can synchronize with our devices using iCloud services are:

iTunes with iCloud, user purchasing music will automatically appear in all of his devices. iTunes Store keeps list of all music purchased by user in past, user can download to any device if its not available. All history of user purchased list will stores in iTunes Store, no matter through which device user brought that music. Along with iCloud, ‘iTunes Match’ service also releasing, it help to keep user whole music collection. Music collection may included music ripped from DVDs/CDs, purchased somewhere else other than iTunes. Even this other music also shared between all device, your music file will be uploaded to iCloud, if your own music file match with iCloud music library of 18 Million music collections, you can use from iCloud itself, only you have to upload which doesn’t match. If it matches user can listen to iCloud music which gives 256Kbps iTunes plus quality, even user had low quality file. iTunes match is not a free service, so it available in $24.99 for a year. Pre-requisite to use iCloud with iTunes is need to have iTunes 10.3 version. It is available to download free for Mac and PC.



Contact, Calendar and Mail – With Apple former service MobileMe, user can share the calendar with the person whom they want to share as well as instant mail in all iOS devices and PCs. This iCloud service with Contact, Calendar and Mail enables the user to access their Contact book, Schedule and Mail from any place. It will avoid the problem of carrying our laptop to access our data wherever we are going, in case we are keeping all the records in our laptop. And also if any change done by one of the member who included in the group, will automatically update to all other member’s schedule.




Documents – As Apple given iWork apps to all developer to make their application work with iCloud, all application which your brought from third party vendor will be used along with iCloud. So you can create documents in all iOS device and check your work from anywhere, all changes made to documents will be automatically moved to all devices. So you can check your office documents in iPhone even during vacation.

Apps Store, iBook Store
 – It enables the user to install/use any of their purchased apps & books from any of his iOS devices. Store will keep the history of entire user purchased list, we can download from the store, in case if apps or book is not available in our device. iCloud also pushes newly buying apps to all other devices. For example user can view the books in PC which are available in their iPhone. Even book mark, highlighted text , comments made and the page which left when closing the application everything will be updated to all other devices through iCloud service. For example, if the user reading a book in iPod, and he closed the application at page 36, thereafter he can able to access the same page where he left at last( that means at page 36) whenever he re-open the same book from MAC or PC.



iCloud Backup 
- It helps to take the backup of all the devices automatically and securely in daily manner. It takes backups of all music, apps, apps data, books, camera roll (picture and video), device settings, home screen, app organization, Text messages, MMS, even ringtones. The backup data will be safe always; even though anything happened to our device accidentally. In this case we can recover the data which are in backup from our damaged device to our new device by just giving our Apple User name and Password. iBackup takes quick backup of only altered data not from scratch every time.

Photo Stream in iCloud
 – By using iCloud service, user can transfer the data from iPhone to any other devices, and also no need to worry about limit on taking photo/video. iCloud Photo Stream features helps to move our photo to our PC picture folder. Except PC, all devices only stores last 1000 photos as it has space constrain. And also iCloud will store all photos for 30 days in iCloud itself in the case of, if we did not connect any other device to icloud.
Pre-requisite to use iCloud feature:
  • iTunes 10.3
  • iOS 4.3.3
  • Mac OS X Lion
  • Window Vista or Window 7
  • Outlook 2007 or 2010

What is Cloud Computing ?

Cloud Computing has become the next big thing in the world of computers. In this blog am going to share the basics of what cloud computing is and where/how it is being used in simple terms.

Now , what is "Cloud Computing ". The basic definition of it can be derived by parsing this term "Cloud Computing" into words as Cloud & Computing.

Cloud:- Generally we use the Cloud symbol to represent the internet- which is a collection of web servers, File server, super computer, printer etc., So some resources will be available somewhere in the network, we can access and make use of it based on the privileges we are holding.

Computing:- It explicitly tells that, computing/processing something on the cloud.
From this we can understand that, the main functionality of cloud computing is, usage of resources over network as a service, on demand basis
We can take Web Server as one of the best example of Cloud Computing. In this, as a user we are sending request to web server, for accessing particular page for certain purpose. The web server recieves our request, and will compute/process that based on the specification we have given and send back response to us. But I have mentioned this example for basic understanding; in reality it is much more than that.







The next question that comes up is, Is it only about Computing over Cloud?
No, Its about usage of resources over network. It might be

  • Storage, to store our data in some centralized location like File server
  • Database, to store data in RDBMS without buying whole licensed software, but pay only usage
  • Platform, to deploy and run our application
  • Software, to use over network like Google Docs
  • Infrastructure, etc,.
One of the key features of Cloud Computing is Utility Computing/On-Demand Computing. Nowadays most of the Cloud providers are distributing lot of resources through utility computing basis.

Utility computing is, we are obligated to pay only for what we use/consume. For example we should pay $0.14 for usage of 1TB/month and $0.125 for next 49TB/month and so on; this rate will be based on our service providers.

Cloud computing will avoid capital expenditure on hardware, software. How?
The main reason that IT industries now depend on Cloud Computing is to avoid capital expenditure on hardware, software for their projects.

For example before starting any innovative project POC (Proof of Concept) i.e.,feasibility checking is done. The basic requirement of any project is the hardware and licensed software.But in a project where success is not guaranteed there is a risk of loss is high. This problem can be solved in Cloud Computing by the feature Utility computing . This feature ensures that the investment on the project and expenses will be always proportional to the usage.

In upcoming posts we will see, what are all the types of Resources available as a service in Cloud Computing.

Friday, January 18, 2013

Understanding the Parallelism of a Storm Topology


In the past few days I have been test-driving Twitter’s Storm project, which is a distributed real-time data processing platform. One of my findings so far has been that the quality of Storm’s documentation and example code is pretty good – it is very easy to get up and running with Storm. Big props to the Storm developers! At the same time, I found the sections on how a Storm topology runs in a cluster not perfectly clear, and learned that the recent releases of Storm changed some of its behavior in a way that is not yet fully reflected in the Storm wiki and in the API docs.
In this article I want to share my own understanding of the parallelism of a Storm topology after reading the documentation and writing some first prototype code. More specifically, I describe the relationships of worker processes, executors (threads) and tasks, and how you can configure them according to your needs. This article is based on Storm release 0.8.1, the latest version as of October 2012.

What is Storm?

For those readers unfamiliar with Storm here is a brief description taken from its homepage:
Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!
Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.

What makes a running topology: worker processes, executors and tasks

Storm distinguishes between the following three main entities that are used to actually run a topology in a Storm cluster:
  • Worker processes
  • Executors (threads)
  • Tasks
Here is a simple illustration of their relationships:
Figure 1: The relationships of worker processes, executors (threads) and tasks in Storm
worker process executes a subset of a topology. A worker process belongs to a specific topology and may run one or more executors for one or more components (spouts or bolts) of this topology. A running topology consists of many such processes running on many machines within a Storm cluster.
An executor is a thread that is spawned by a worker process. It may run one or more tasks for the same component (spout or bolt).
task performs the actual data processing – each spout or bolt that you implement in your code executes as many tasks across the cluster. The number of tasks for a component is always the same throughout the lifetime of a topology, but the number of executors (threads) for a component can change over time. This means that the following condition holds true: #threads <= #tasks. By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.

Configuring the parallelism of a topology

Note that in Storm’s terminology “parallelism” is specifically used to describe the so-calledparallelism hint, which means the initial number of executors (threads) of a component. In this article though I use the term “parallelism” in a more general sense to describe how you can configure not only the number of executors but also the number of worker processes and the number of tasks of a Storm topology. I will specifically call out when “parallelism” is used in the narrow definition of Storm.

The following table gives an overview of the various configuration options and how to set them in your code. There is more than one way of setting these options though, and the table lists only some of them. Storm currently has the following order of precedence for configuration settingsdefaults.yaml > storm.yaml > topology-specific configuration > internal component-specific configuration > external component-specific configuration. Please take a look at the Storm documentation for more details.
WhatDescriptionConfiguration optionHow to set in your code examples)
#worker processesHow many worker processes to createfor the topologyacross machines in the cluster.TOPOLOGY_ WORKERSConfig#setNumWorkers
#executors (threads)How many executors to spawnper component.?TopologyBuilder#setSpout() and TopologyBuilder#setBolt()


Note that as of Storm 0.8 theparallelism_hint parameter now specifies the initial
number of executors (not tasks!) for that bolt.
#tasksHow many tasks to create per component.TOPOLOGY_TASKS Component ConfigurationDeclarer #setNumTasks()
Here is an example code snippet to show these settings in practice:
Configuring the parallelism of a Storm bolt
1
2
3
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");
In the above code we configured Storm to run the bolt GreenBolt with an initial number of two executors and four associated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number of tasks, Storm will run by default one task per executor.

Example of a running topology

The following illustration shows how a simple topology would look like in operation. The topology consists of three components: one spout called BlueSpout and two bolts called GreenBolt and YellowBolt. The components are linked such that BlueSpout sends its output to GreenBolt, which in turns sends its own output to YellowBolt.
Figure 2: Example of a running topology
The GreenBolt was configured as per the code snippet above whereas BlueSpout and YellowBolt only set the parallelism hint (number of executors). Here is the relevant code:
Configuring the parallelism of a simple Storm topology
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );
And of course Storm comes with additional configuration settings to control the parallelism of a topology, including:
  • TOPOLOGY_MAX_TASK_PARALLELISM: This setting puts a ceiling on the number of executors that can be spawned for a single component. It is typically used during testing to limit the number of threads spawned when running a topology in local mode. You can set this option via e.g. Config#setMaxTaskParallelism().
Update Oct 18: Nathan Marz informed me that TOPOLOGY_OPTIMIZE will be removed in a future release. I have therefore removed its entry from the configuration list above.

How to change the parallelism of a running topology

A nifty feature of Storm is that you can increase or decrease the number of worker processes and/or executors without being required to restart the cluster or the topology. The act of doing so is calledrebalancing.
You have two options to rebalance a topology:
  1. Use the Storm web UI to rebalance the topology.
  2. Use the CLI tool storm rebalance as described below.
Here is an example of using the CLI tool:
1
2
3
4
5
 # Reconfigure the topology "mytopology" to use 5 worker processes,
 # the spout "blue-spout" to use 3 executors and
 # the bolt "yellow-bolt" to use 10 executors.

 $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

References for this article

To compile this article (and to write my related test code) I used information primarily from the following sources: