Friday, January 18, 2013

Analyzing Twitter Data with Apache Hadoop


Social media has gained immense popularity with marketing teams, and Twitter is an effective tool for a company to get people excited about its products. Twitter makes it easy to engage users and communicate directly with them, and in turn, users can provide word-of-mouth marketing for companies by discussing the products. Given limited resources, and knowing we may not be able to talk to everyone we want to target directly, marketing departments can be more efficient by being selective about whom we reach out to.
In this post, we’ll learn how we can use Apache FlumeApache HDFSApache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data. This will be the first post in a series. The posts to follow to will describe, in more depth, how each component is involved and how the custom code operates. All the code and instructions necessary to reproduce this pipeline is available on the Cloudera Github.

Who is Influential?

To understand whom we should target, let’s take a step back and try to understand the mechanics of Twitter. A user – let’s call him Joe – follows a set of people, and has a set of followers. When Joe sends an update out, that update is seen by all of his followers. Joe can also retweet other users’ updates. A retweet is a repost of an update, much like you might forward an email. If Joe sees a tweet from Sue, and retweets it, all of Joe’s followers see Sue’s tweet, even if they don’t follow Sue. Through retweets, messages can get passed much further than just the followers of the person who sent the original tweet. Knowing that, we can try to engage users whose updates tend to generate lots of retweets. Since Twitter tracks retweet counts for all tweets, we can find the users we’re looking for by analyzing Twitter data.
Now we know the question we want to ask: Which Twitter users get the most retweets? Who is influential within our industry?

How Do We Answer These Questions?

SQL queries can be used to answer this question: We want to look at which users are responsible for the most retweets, in descending order of most retweeted. However, querying Twitter data in a traditional RDBMS is inconvenient, since the Twitter Streaming API outputs tweets in a JSON format which can be arbitrarily complex. In the Hadoop ecosystem, the Hive project provides a query interface which can be used to query data that resides in HDFS. The query language looks very similar to SQL, but allows us to easily model complex types, so we can easily query the type of data we have. Seems like a good place to start. So how do we get Twitter data into Hive? First, we need to get Twitter data into HDFS, and then we’ll be able to tell Hive where the data resides and how to read it.

The diagram above shows a high-level view of how some of the CDH (Cloudera’s Distribution Including Apache Hadoop) components can be pieced together to build the data pipeline we need to answer the questions we have. The rest of this post will describe how these components interact and the purposes they each serve.

Gathering Data with Apache Flume

The Twitter Streaming API will give us a constant stream of tweets coming from the service. One option would be to use a simple utility like curl to access the API and then periodically load the files. However, this would require us to write code to control where the data goes in HDFS, and if we have a secure cluster, we will have to integrate with security mechanisms. It will be much simpler to use components within CDH to automatically move the files from the API to HDFS, without our manual intervention.
Apache Flume is a data ingestion system that is configured by defining endpoints in a data flow called sources and sinks. In Flume, each individual piece of data (tweets, in our case) is called an event; sources produce events, and send the events through a channel, which connects the source to the sink. The sink then writes the events out to a predefined location. Flume supports some standard data sources, such as syslog or netcat. For this use case, we’ll need to design a custom source that accesses the Twitter Streaming API, and sends the tweets through a channel to a sink that writes to HDFS files. Additionally, we can use the custom source to filter the tweets on a set of search keywords to help identify relevant tweets, rather than a pure sample of the entire Twitter firehose. The custom Flume source code can be found here.

Partition Management with Oozie

Once we have the Twitter data loaded into HDFS, we can stage it for querying by creating an external table in Hive. Using an external table will allow us to query the table without moving the data from the location where it ends up in HDFS. To ensure scalability, as we add more and more data, we’ll need to also partition the table. A partitioned table allows us to prune the files that we read when querying, which results in better performance when dealing with large data sets. However, the Twitter API will continue to stream tweets and Flume will perpetually create new files. We can automate the periodic process of adding partitions to our table as the new data comes in.
Apache Oozie is a workflow coordination system that can be used to solve this problem. Oozie is an extremely flexible system for designing job workflows, which can be scheduled to run based on a set of criteria. We can configure the workflow to run an ALTER TABLE command that adds a partition containing the last hour’s worth of data into Hive, and we can instruct the workflow to occur every hour. This will ensure that we’re always looking at up-to-date data.
The configuration files for the Oozie workflow are located here.

Querying Complex Data with Hive

Before we can query the data, we need to ensure that the Hive table can properly interpret the JSON data. By default, Hive expects that input files use a delimited row format, but our Twitter data is in a JSON format, which will not work with the defaults. This is actually one of Hive’s biggest strengths. Hive allows us to flexibly define, and redefine, how the data is represented on disk. The schema is only really enforced when we read the data, and we can use the Hive SerDe interface to specify how to interpret what we’ve loaded.
SerDe stands for Serializer and Deserializer, which are interfaces that tell Hive how it should translate the data into something that Hive can process. In particular, the Deserializer interface is used when we read data off of disk, and converts the data into objects that Hive knows how to manipulate. We can write a custom SerDe that reads the JSON data in and translates the objects for Hive. Once that’s put into place, we can start querying. The JSON SerDe code can be found here. The SerDe will take a tweet in JSON form, like the following:
{
   "retweeted_status": {
      "contributors": null,
      "text": "#Crowdsourcing – drivers already generate traffic data for your smartphone to suggest alternative routes when a road is clogged. #bigdata",
      "geo": null,
      "retweeted": false,
      "in_reply_to_screen_name": null,
      "truncated": false,
      "entities": {
         "urls": [],
         "hashtags": [
            {
               "text": "Crowdsourcing",
               "indices": [
                  0,
                  14
               ]
            },
            {
               "text": "bigdata",
               "indices": [
                  129,
                  137
               ]
            }
         ],
         "user_mentions": []
      },
      "in_reply_to_status_id_str": null,
      "id": 245255511388336128,
      "in_reply_to_user_id_str": null,
      "source": "SocialOomph",
      "favorited": false,
      "in_reply_to_status_id": null,
      "in_reply_to_user_id": null,
      "retweet_count": 0,
      "created_at": "Mon Sep 10 20:20:45 +0000 2012",
      "id_str": "245255511388336128",
      "place": null,
      "user": {
         "location": "Oregon, ",
         "default_profile": false,
         "statuses_count": 5289,
         "profile_background_tile": false,
         "lang": "en",
         "profile_link_color": "627E91",
         "id": 347471575,
         "following": null,
         "protected": false,
         "favourites_count": 17,
         "profile_text_color": "D4B020",
         "verified": false,
         "description": "Dad, Innovator, Sales Professional. Project Management Professional (PMP).  Soccer Coach,  Little League Coach  #Agile #PMOT - views are my own -",
         "contributors_enabled": false,
         "name": "Scott Ostby",
         "profile_sidebar_border_color": "404040",
         "profile_background_color": "0F0F0F",
         "created_at": "Tue Aug 02 21:10:39 +0000 2011",
         "default_profile_image": false,
         "followers_count": 19005,
         "profile_image_url_https": "https://si0.twimg.com/profile_images/1928022765/scott_normal.jpg",
         "geo_enabled": true,
         "profile_background_image_url": "http://a0.twimg.com/profile_background_images/327807929/xce5b8c5dfff3dc3bbfbdef5ca2a62b4.jpg",
         "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/327807929/xce5b8c5dfff3dc3bbfbdef5ca2a62b4.jpg",
         "follow_request_sent": null,
         "url": "http://facebook.com/ostby",
         "utc_offset": -28800,
         "time_zone": "Pacific Time (US & Canada)",
         "notifications": null,
         "friends_count": 13172,
         "profile_use_background_image": true,
         "profile_sidebar_fill_color": "1C1C1C",
         "screen_name": "ScottOstby",
         "id_str": "347471575",
         "profile_image_url": "http://a0.twimg.com/profile_images/1928022765/scott_normal.jpg",
         "show_all_inline_media": true,
         "is_translator": false,
         "listed_count": 45
      },
      "coordinates": null
   },
   "contributors": null,
   "text": "RT @ScottOstby: #Crowdsourcing – drivers already generate traffic data for your smartphone to suggest alternative routes when a road is  ...",
   "geo": null,
   "retweeted": false,
   "in_reply_to_screen_name": null,
   "truncated": false,
   "entities": {
      "urls": [],
      "hashtags": [
         {
            "text": "Crowdsourcing",
            "indices": [
               16,
               30
            ]
         }
      ],
      "user_mentions": [
         {
            "id": 347471575,
            "name": "Scott Ostby",
            "indices": [
               3,
               14
            ],
            "screen_name": "ScottOstby",
            "id_str": "347471575"
         }
      ]
   },
   "in_reply_to_status_id_str": null,
   "id": 245270269525123072,
   "in_reply_to_user_id_str": null,
   "source": "web",
   "favorited": false,
   "in_reply_to_status_id": null,
   "in_reply_to_user_id": null,
   "retweet_count": 0,
   "created_at": "Mon Sep 10 21:19:23 +0000 2012",
   "id_str": "245270269525123072",
   "place": null,
   "user": {
      "location": "",
      "default_profile": true,
      "statuses_count": 1294,
      "profile_background_tile": false,
      "lang": "en",
      "profile_link_color": "0084B4",
      "id": 21804678,
      "following": null,
      "protected": false,
      "favourites_count": 11,
      "profile_text_color": "333333",
      "verified": false,
      "description": "",
      "contributors_enabled": false,
      "name": "Parvez Jugon",
      "profile_sidebar_border_color": "C0DEED",
      "profile_background_color": "C0DEED",
      "created_at": "Tue Feb 24 22:10:43 +0000 2009",
      "default_profile_image": false,
      "followers_count": 70,
      "profile_image_url_https": "https://si0.twimg.com/profile_images/2280737846/ni91dkogtgwp1or5rwp4_normal.gif",
      "geo_enabled": false,
      "profile_background_image_url": "http://a0.twimg.com/images/themes/theme1/bg.png",
      "profile_background_image_url_https": "https://si0.twimg.com/images/themes/theme1/bg.png",
      "follow_request_sent": null,
      "url": null,
      "utc_offset": null,
      "time_zone": null,
      "notifications": null,
      "friends_count": 299,
      "profile_use_background_image": true,
      "profile_sidebar_fill_color": "DDEEF6",
      "screen_name": "ParvezJugon",
      "id_str": "21804678",
      "profile_image_url": "http://a0.twimg.com/profile_images/2280737846/ni91dkogtgwp1or5rwp4_normal.gif",
      "show_all_inline_media": false,
      "is_translator": false,
      "listed_count": 7
   },
   "coordinates": null
}
and translate the JSON entities into queryable columns:
SELECT created_at, entities, text, user
FROM tweets
WHERE user.screen_name='ParvezJugon'
  AND retweeted_status.user.screen_name='ScottOstby';
  
which will result in:
created_at                entities           text          user
Mon Sep 10 21:19:23 +0000 2012    {"urls":[],"user_mentions":[{"screen_name":"ScottOstby","name":"Scott Ostby"}],"hashtags":[{"text":"Crowdsourcing"}]}    RT @ScottOstby: #Crowdsourcing – drivers already generate traffic data for your smartphone to suggest alternative routes when a road is  ...    {"screen_name":"ParvezJugon","name":"Parvez Jugon","friends_count":299,"followers_count":70,"statuses_count":1294,"verified":false,"utc_offset":null,"time_zone":null}
We’ve now managed to put together an end-to-end system, which gathers data from the Twitter Streaming API, sends the tweets to files on HDFS through Flume, and uses Oozie to periodically load the files into Hive, where we can query the raw JSON data, through the use of a Hive SerDe.

Some Results

In my own testing, I let Flume collect data for about three days, filtering on a set of keywords: 
 
hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
The collected data was about half a GB of JSON data, and here is an example of what a tweet looks like. The data has some structure, but certain fields may or may not exist. The retweeted_status field, for example, will only be present if the tweet was a retweet. Additionally, some of the fields may be arbitrarily complex. The hashtags field is an array of all the hashtags present in the tweets, but most RDBMS’s do not support arrays as a column type. This semi-structured quality of the data makes the data very difficult to query in a traditional RDBMS. Hive can handle this data much more gracefully.
The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:
SELECT
  t.retweeted_screen_name,
  sum(retweets) AS total_retweets,
  count(*) AS tweet_count
FROM (SELECT
        retweeted_status.user.screen_name as retweeted_screen_name,
      retweeted_status.text,
      max(retweet_count) as retweets
      FROM tweets
      GROUP BY retweeted_status.user.screen_name,
               retweeted_status.text) t
GROUP BY t.retweeted_screen_name
ORDER BY total_retweets DESC
LIMIT 10;
For the few days of data, I found that these were the most retweeted users for the industry:
  retweeted_screen_name  total_retweets  tweet_count
  mauricefreedman         493          1
  HarvardBiz   362                     6
  TechCrunch   314   7
  googleanalytics  244   10
  BigDataBorat   201   6
  stephen_wolfram  182          1
  CloudExpo   153   28
  TheNextWeb   150   1
  GonzalezCarmen  121   10
  IBMbigdata   100   37
  
From these results, we can see whose tweets are getting heard by the widest audience, and also determine whether these people are communicating on a regular basis or not. We can use this information to more carefully target our messaging in order to get them talking about our products, which, in turn, will get other people talking about our products.


No comments:

Post a Comment