Introducing Apache Kafka – Part One



style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-1838053042931493"
data-ad-slot="3431401963">

What is Apache Kafka?

From http://kafka.apache.org/

The Apache Kafka project page defines Apache Kafka as a publish-subscribe messaging rethought as a distributed commit log. Kafka is a high-throughput distributed messaging system.

The Kafka project page is probably the best source of information for anyone interested in Kafka and its inner workings. However, the above definitions would be more helpful to readers who already understand what a commit log is, how messaging systems work and has a basic idea of distributed system.

In this post, my attempt would be to explain these concepts in a simple way and make it understandable to anyone interested the basic architecture and workings of Apache Kafka. So, lets get started.

What is a message?

A message in its literal sense would be some information that needs to go from a source to a destination. When this information is sent from the source and is received by the destination we state that the message has been delivered. For e.g. let us consider you and your friend use a long hollow pipe to talk to each other.  When your friend speaks into it from one end, you would keep your ears to the other end to hear what he just said.  In this way you will receive the messages he would have to convey.

Friends

However, one thing to note here is that you and your friend should synchronize the activity of speaking and hearing, i.e. if your friend is speaking into the pipe, you at the other end should be ready to listen. If you are not ready, the message is lost.

What is a message in the context of a messaging system?

In its simplest form a message is nothing but data. A system that can handle transmission of data from a source to a destination is called a messaging system. In the computing world, the source could be machine that is generating data (for e.g. a Web Server generates large number of logs) or a human user. In most cases you will see the volumes generated by systems and machines to be way larger than the ones generated by human beings.

SourceDestination

In a messaging system you would want to ensure that the message is delivered to the target. Even if the target is not ready to receive it, there has to be a provision to hold it. To achieve this, the messaging system should provide a way of retaining the message in the medium of communication until the target is ready to receive it.

The messaging system, with the ability to send, hold and deliver messages seems to be a good system. However, in reality, the scenarios could be such that the messages may have to be delivered to more than one destination. This introduces the need to hold the messages until it is consumed by all the destinations. This brings forward one more characteristic of a messaging system, i.e. the source is only going to deliver the messages to the communication channel/medium (think of the pipe) and the destination will be responsible for reading the message from the medium. So now, the messages can wait on the channel and the different destinations will read the messages at their own pace.

As there could be many destinations, there could also be several sources that write to the medium.

ds

With this understanding, lets give some technical labels to the different components in a messaging system.

Publisher

The source or the different sources create messages. Lets call them publishers.

Subscriber

The destinations/targets are the one who read the message, in other words, they subscribe to the messages generated by the publishers. Lets call them subscribers.

There is one more component that we still need to mention. So far, we have been referring to it as the channel/medium to which publishers write data and from where subscribers read data. That is not an entirely correct way of referring to it. A more appropriate way to call it would be to refer to it as – the log. Why log?

Logs, in a general sense stand for a record of “what happened”. In our context the message can be classified as an event and the log is what stores that event. Publishers publish events to the log and the subscribers read these events from the log to know “what happened”. Following is a simple illustration of how a log appears.

log

As you can see above, the log is a sequence of numbered messages that are append only and ordered by time. The log, in the database world, is often referred to as a commit log. To understand how logs work and understand it in-depth is not the intention of this post. However, if you are interested in knowing more about logs, you should read the following blog post by Jay Kreps:

http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying

This is one of the best technical posts I have read. It is like a mini-course on distributed systems and is extremely well written. You should also check out his video here – http://youtu.be/aJuo_bLSW6s

In this post we have discussed the following:

  • Messaging system
  • Publishers
  • Subscribers
  • Commit Log

In the next post, we will get a bit deeper into understanding the above terms with respect to Apache Kafka and how they all work together. We will also see how Apache Kafka maintains a distributed commit log across a cluster making it a reliable distributed messaging system.


style="display:inline-block;width:728px;height:90px"
data-ad-client="ca-pub-1838053042931493"
data-ad-slot="3431401963">

Frequent Asked Questions (Hadoop)

It has been a while since my last post and over that period I have received several questions via comments on my different posts. Almost all of the questions are related to Hadoop and I thought of starting this year with a post just answering those questions. I see readers asking questions that I have already answered several times as part of my responses. I thought it would be good if I could go through all the questions and come up with a frequently asked set of questions and answer all of them in one post. Hopefully, this will help my readers find answers at one place and will not have to go through all the post comments and my responses to those comments. I hope this helps.

Q: I am from XYZ background/technology/domain. Can I learn Hadoop?

A: Hadoop is an open source framework that is used many organizations to solve the problem of processing large amounts of data. It is a technology just like any other technology but it is gaining attention because of its application and widespread adoption. You can have experience in any background, technology or domain. If you are interested there should be nothing that should really stop you from learning Hadoop. For some it may take less time to pick it up, for some it may be long. It all depends on whether the technology interests you and you are willing to put in the effort and persistence required to learn it. Learning Hadoop is just like learning any other framework or technology.

Q: What is the future of Hadoop?

A: The present and the future of Hadoop is bright. I say this because the ecosystem, i.e. the applications that are being built on top of Hadoop are catering to almost every kind of use-case. Also the adoption of Hadoop by organizations within their core data teams is on the increase. The more the adoption, the better the future.

Q: I don’t have a technical background; can I still pick up Hadoop?

A: You can be from any background. Hadoop is no rocket science. It is vast and the ecosystem is growing everyday. Having said that, the community has done a pretty good job of documenting the workings and applications of Hadoop in their respective Apache project pages.

Q: I don’t know Java, but I am good at X, Y and Z, will Hadoop be the right choice for me?

A: You don’t need to know Java to learn Hadoop. Hadoop is the right choice only if you want to learn about Hadoop. If you have an inclination or itch to try out Hadoop, go ahead and try it out. You should be a good judge to know if you are enjoying it or no.

Q: Can you suggest good books to learn Hadoop?

A: There are a few books out there, you should be able to Google it and read the reviews of the books available. I would recommend the Hadoop Definitive Guide.

Q: Where can I practice Hadoop?

A: If you have a computer with good amount of RAM (around 8GB or more) and good performing CPU, you can spin up few virtual machines and install Hadoop on the cluster of VMs. You could also download the various Hadoop pre-installed VMs available.

Q: Can you guide me on a career path in the field of Hadoop?

A: There is really not much one can guide when it comes to Hadoop. Everything is out there. Tons of blog posts, books and the Apache project pages should easily give you an idea how Hadoop is doing and things required to pick up Hadoop. If you can play around with it and get a feel, you yourself should be the best person to decide if this is for you or not. Just put your head down and work, work real hard.

Q: Can you guide me to crack the interview?

A: Frankly, I am not sure how this is even possible. If you are prepared you will crack the interview. There is no set way or a magic pill to crack an interview. Read the job description, try and speak to others who do a similar job, prepare a good resume and give it your best shot. All the best!

Q: Can you please answer the below exam questions?

A: NO.

Q: Should I do the Admin course or the Developer course?

A: If managing a cluster (networking, monitoring, troubleshooting, Linux stuff) is what interests you should do the administrator course. If you like to write code (Pig scripts, Hive queries, Map Reduce, etc.) and solve data problems (which includes data engineering problems) you should consider doing a developer centric course. However, before doing any course try doing some self-study and get a feel.

Q: I am a Tester; will Hadoop be a good fit for me?

A: Anyone interested can do Hadoop. But if the question is, “Do Testers have a role in the Hadoop world?” then the answer is YES. A Hadoop distribution company needs to have testers who will test all the components and their inter-operability before they release a distribution. A tester with good understanding of Hadoop and its inner workings would be a great fit for such organizations.

Q: I followed the steps mentioned in your post, but it does not work for me. Can you help?

A: I would love to help. But practically, it is just not possible to help. I can help if I can see something obvious but if it requires lot of back and forth in the form of comments or emails, I really won’t have the time. Ideally, if you follow the instructions as they are (including the versions of the components used), the steps should work.

Q: I have X years of experience in XYZ field, will Hadoop give me a good start?

A: You should be getting into any field, because you want to get into it, not because it will give you a good start. If the technology excites you, you should play around with it, talk about it to others in the field and work hard to learn it. The same goes for Hadoop.

Q: What is the salary I can get if I learn Hadoop?

A: I don’t know.

Q: Is it mandatory to learn UNIX scripting to learn Hadoop?

A: It is NOT mandatory.

Q: Can you provide Hadoop certification and exam details?

A: NO.

Q: Where can I get large data sets for processing data in Hadoop?

A: Google it, you will find tons of free data sets from various domains. A quick search shows the following link form Quora – http://www.quora.com/Where-can-I-find-large-datasets-open-to-the-public

Q: Where should I start, if I need to learn Hadoop?

A: Apache Project pages, Youtube/Vimeo videos, blog posts and the book – Hadoop Definitive Guide.

Q: Could you please suggest universities that are reputed among companies in the Silicon Valley?

A: I don’t know the answer to this.

Q: I have enrolled for a course with XYZ course provider. Are they good trainers? Can you suggest any good trainers? Do you teach?

A: No, I am not aware of any good trainers or good training institutes. Also, I am not a trainer and I don’t teach Hadoop. I blog about it, so, you may get to learn something from my blog posts. Also, www.hadoopscreencasts.com was something that I started to share my learning. If I do get the time to work on it, you should see more videos there.

Q: Will coding questions be asked in the certification exams?

A: I don’t know.

Q: I have done administration certification; can I clear the developer certification?

A: I don’t know. Look at the respective certification details for this.

Q: Is Hadoop a good career choice? Will Hadoop boost my career?

A: Hadoop is being used by many organizations and there is a lot of requirement for people who have Hadoop knowledge. Having said that, there are many such technologies and frameworks that are in demand. You have to do what you like to do.

Q: Which is better, Pig or Hive?

A: Both are good and are being used widely. If you like writing scripts, you can try Pig. If you like writing SQL like queries, use Hive.

Q: Can you suggest some projects that deal with Big Data?

A: For this, you will have to be creative. If you can’t think of anything, redo what people have already done. Take data sets that are freely available and think of all the questions you can ask.

Q: Do you teach Hadoop?

A: I don’t teach Hadoop. I blog and have been trying to work on www.hadoopscreencasts.com whenever I get the time.

Q: Is it difficult to get a job in the Hadoop field?

A: NO. If you can work hard and prove yourself, I think it should be easy.

XML Data Processing Using Talend Open Studio

1

For my Netflix Search Engine project, Flicksery, I get data from Netflix as XML files. The catalog file for instant titles is around 600 MB. Following is a sample XML entry for one title/movie.

0

As you can see it is not very easy to read. Not only the alignment, but the number of elements too. May be we could use a better text editor or an XML editor to see this properly. But it still would be very difficult to play around with the data and do any kind of transformation or analysis. Talend Open Studio is something that can be really useful to analyze the data embedded within the large XML.

In this post we will try and analyze the XML file using some really neat features available in Talend Open Studio.

Let’s get started:

Open up Talend Open Studio and create a new project – NetflixDataAnalysis:

2

Right click on Job Designs and select Create jobInstantFeedTest

3

Right click on File xml and select Create file xml:

4

This brings up a wizard. Enter the details as shown below and click Next:

5

In step 2 of the wizard select Input XML and click, Next.

6

In step 3 of the wizard select the XML file. For this test, I took only part of the XML file as loading the entire 600 MB file would cause Java Heap issues and would prevent the file from loading correctly. As we just want to analyze and see the different fields available in the XML, a sample should be sufficient. Once the file is selected you should see the schema of the XML file in the Schema Viewer.

7

Step 4 is where you start to see the real power of  Talend Open Studio. The Source Schema list on the left displays the schema of the XML file.  The Target Schema section provides you with a way of defining an output schema for the XML. Using XPath you can now define the required elements from the input XML file. You can drag the element which will repeat itself in the XML to Xpath loop expression section. In this case the element catalog_title is the element that embeds all information for a single movie/title.

Next, you can traverse through all the nodes on the left and drag the required elements to the right under the Fields to extract section. You can also provide custom column names under the Column Name section. Once you are done dragging all the required fields, click on Refresh Preview to see a preview of the data. This preview helps one get a quick idea of how the data will be parsed. Click Finish.

8

Double click on the job, InstantFeedTest to open it up in the workspace. Drag the newly created XML Metadata, NetflixInstantFeed. Also, drag the tFileOutputDelimited_1 component from the Palette on the right.

9

Right click on the NetflixInstantFeed node and select, Row->Main and join it to tFileOutputDelimited_1 node. Once joined it should look like the image below:10

Select the tFileOutputDelimited_1 node and go to the “Component” tab at the bottom of the workspace. Update the configurations, Field Separator to “,” and set the File Name to the desired path and name.11 We are now ready to test out our job. Click on the Run icon on the toolbar to execute the job. Once executed you should see processing status as shown below:12The above job is going to read the XML file, extract the fields and generate a comma separated text file with the extracted data.

http://api-public.netflix.com/catalog/titles/movies/780726,The Mummy,http://cdn0.nflximg.net/images/0270/2990270.jpg,When British archaeologists uncover the ancient sarcophagus of a mummified Egyptian priest (Boris Karloff), they foolishly ignore its warning not to open the box. Now brought back to life, the mummy tries to resurrect the soul of his long-dead love.,,Top 100 Thrills nominee,,,1346482800,4102444800,NR,MPAA,4388,1.77:1,1932,Classic Movies,3.5,1387701023800,,,

As you can see the big XML node has now more readable as a simple comma separated record. This was a simple one-to-one mapping to from XML to CSV. Talend Open Studio is way more powerful than this. You can add a new component to the job to apply transformations to data coming in from the XML.

As you see in the above record the first column/value in the comma separated record is a link. All I am interested is in the last 6 digits of the link. I want my final output to have only 6 digits and not the entire link. To do this delete the link between NetflixInstantFeed and tFileOutputDelimited_1 nodes. Next, drag tMap_1 component from the Pallete to the Job workspace. Right click on NetflixInstantFeed, select, Row->Main and join it to tMap_1. Next, right click on tMap_1, select, Row->New Output (Main) and join it to tFileOutputDelimited_1 node. You will be prompted to enter a name. The name entered for this example is processed_op. Once done, the job should now look as shown below:

15Select the tMap_1 component and click on the Map Editor button on Component tab at the bottom of the workspace. The Map Editor opens up with the metadata information from the XML file on the left and desired output on the right. As you see below I have dragged all the columns from the left to the right. The only modification I have done is for the “id” column. I have applied a function to get only the last 6 digits from the right.
13

As you can see we can easily apply functions to transform the input data. The function shown above is a StringHandling function. There are several other functions that can be applied using the Expression Builder as shown below:

16After you are done applying your function, click OK to close the screen. Now, you can re-run the job to see if the transformation has been applied correctly. After a successful run you should see the results of the job as shown below:
14Let us look at the output file to see the effect of the transformation:

780726,The Mummy,http://cdn0.nflximg.net/images/0270/2990270.jpg,When British archaeologists uncover the ancient sarcophagus of a mummified Egyptian priest (Boris Karloff), they foolishly ignore its warning not to open the box. Now brought back to life, the mummy tries to resurrect the soul of his long-dead love.,,Top 100 Thrills nominee,,,1346482800,4102444800,NR,MPAA,4388,1.77:1,1932,Classic Movies,3.5,1387701023800,,,

We have successfully built a job to transform an XML file to a comma separated file. This job can be exported and run as a standalone job on any environment running Java. Also, we chose to output the data to CSV file, however, Talend Open Studio can read from multiple data formats, databases and also write to different file formats or directly insert into databases.

This was just a quick introduction (tip of the iceberg)  to experience the usefulness of  Talend Open Studio for data processing. The features of this tool are vast and can’t be covered in a blog post. But this should get you started on using Talend. Hope you found this fast paced tutorial useful.

Apache Oozie Installation

In this post we will be going through the steps to install Apache Oozie Server and Client. These instructions assume that you have Hadoop installed and running.

My Hadoop Location : /home/hduser/hadoop

From your home directory execute the following commands (my home directory is /home/hduser):

$ pwd
/home/hduser

Download Oozie

$ wget http://supergsego.com/apache/oozie/3.3.2/oozie-3.3.2.tar.gz

Untar

$ tar xvzf oozie-3.3.2.tar.gz

Build Oozie

$ cd oozie-3.3.2/bin
$ ./mkdistro.sh -DskipTests

Oozie Server Setup

Copy the built binaries to the home directory as ‘oozie’

$ cd ../../
$ cp -R oozie-3.3.2/distro/target/oozie-3.3.2-distro/oozie-3.3.2/ oozie

Create the required libext directory

$ cd oozie
$ mkdir libext

Copy all the required jars from hadooplibs to the libext directory using the following command:

$ cp ../oozie-3.3.2/hadooplibs/target/oozie-3.3.2-hadooplibs.tar.gz .
$ tar xzvf oozie-3.3.2-hadooplibs.tar.gz
$ cp oozie-3.3.2/hadooplibs/hadooplib-1.1.1.oozie-3.3.2/* libext/

Get Ext2Js – This library is not bundled with Oozie and needs to be downloaded separately. This library is used for the Oozie Web Console:

$ cd libext
$ wget http://extjs.com/deploy/ext-2.2.zip
$ cd ..

Update ../hadoop/conf/core-site.xml as follows:

<property>
<name>hadoop.proxyuser.hduser.hosts</name>
<value>localhost</value>
</property>
<property>
<name>hadoop.proxyuser.hduser.groups</name>
<value>hadoop</value>
</property>

Here, ‘hduser’ is the username and it belongs to ‘hadoop’ group.

Prepare the WAR file

$ ./bin/oozie-setup.sh prepare-war

setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

INFO: Adding extension: /home/hduser/oozie/libext/commons-beanutils-1.7.0.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-beanutils-core-1.8.0.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-codec-1.4.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-collections-3.2.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-configuration-1.6.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-digester-1.8.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-el-1.0.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-io-2.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-lang-2.4.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-logging-1.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-math-2.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/commons-net-1.4.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/hadoop-client-1.1.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/hadoop-core-1.1.1.jar
INFO: Adding extension: /home/hduser/oozie/libext/hsqldb-1.8.0.7.jar
INFO: Adding extension: /home/hduser/oozie/libext/jackson-core-asl-1.8.8.jar
INFO: Adding extension: /home/hduser/oozie/libext/jackson-mapper-asl-1.8.8.jar
INFO: Adding extension: /home/hduser/oozie/libext/log4j-1.2.16.jar
INFO: Adding extension: /home/hduser/oozie/libext/oro-2.0.8.jar
INFO: Adding extension: /home/hduser/oozie/libext/xmlenc-0.52.jar

New Oozie WAR file with added 'ExtJS library, JARs' at /home/hduser/oozie/oozie-server/webapps/oozie.war

INFO: Oozie is ready to be started

Create sharelib on HDFS

$ ./bin/oozie-setup.sh sharelib create -fs hdfs://localhost:54310
setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
the destination path for sharelib is: /user/hduser/share/lib

Create the OoozieDB

$ ./bin/ooziedb.sh create -sqlfile oozie.sql -run
setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

Validate DB Connection
DONE
Check DB schema does not exist
DONE
Check OOZIE_SYS table does not exist
DONE
Create SQL schema
DONE
Create OOZIE_SYS table
DONE

Oozie DB has been created for Oozie version '3.3.2'

The SQL commands have been written to: oozie.sql

To start Oozie as a daemon use the following command:

$ ./bin/oozied.sh start

Setting OOZIE_HOME: /home/hduser/oozie
Setting OOZIE_CONFIG: /home/hduser/oozie/conf
Sourcing: /home/hduser/oozie/conf/oozie-env.sh
setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
Setting OOZIE_CONFIG_FILE: oozie-site.xml
Setting OOZIE_DATA: /home/hduser/oozie/data
Setting OOZIE_LOG: /home/hduser/oozie/logs
Setting OOZIE_LOG4J_FILE: oozie-log4j.properties
Setting OOZIE_LOG4J_RELOAD: 10
Setting OOZIE_HTTP_HOSTNAME: rohit-VirtualBox
Setting OOZIE_HTTP_PORT: 11000
Setting OOZIE_ADMIN_PORT: 11001
Setting OOZIE_HTTPS_PORT: 11443
Setting OOZIE_BASE_URL: http://rohit-VirtualBox:11000/oozie
Setting CATALINA_BASE: /home/hduser/oozie/oozie-server
Setting OOZIE_HTTPS_KEYSTORE_FILE: /home/hduser/.keystore
Setting OOZIE_HTTPS_KEYSTORE_PASS: password
Setting CATALINA_OUT: /home/hduser/oozie/logs/catalina.out
Setting CATALINA_PID: /home/hduser/oozie/oozie-server/temp/oozie.pid

Using CATALINA_OPTS: -Xmx1024m -Dderby.stream.error.file=/home/hduser/oozie/logs/derby.log
Adding to CATALINA_OPTS: -Doozie.home.dir=/home/hduser/oozie -Doozie.config.dir=/home/hduser/oozie/conf -Doozie.log.dir=/home/hduser/oozie/logs -Doozie.data.dir=/home/hduser/oozie/data -Doozie.config.file=oozie-site.xml -Doozie.log4j.file=oozie-log4j.properties -Doozie.log4j.reload=10 -Doozie.http.hostname=rohit-VirtualBox -Doozie.admin.port=11001 -Doozie.http.port=11000 -Doozie.https.port=11443 -Doozie.base.url=http://rohit-VirtualBox:11000/oozie -Doozie.https.keystore.file=/home/hduser/.keystore -Doozie.https.keystore.pass=password -Djava.library.path=

Using CATALINA_BASE: /home/hduser/oozie/oozie-server
Using CATALINA_HOME: /home/hduser/oozie/oozie-server
Using CATALINA_TMPDIR: /home/hduser/oozie/oozie-server/temp
Using JRE_HOME: /usr/lib/jvm/java-6-oracle
Using CLASSPATH: /home/hduser/oozie/oozie-server/bin/bootstrap.jar
Using CATALINA_PID: /home/hduser/oozie/oozie-server/temp/oozie.pid

To start Oozie as a foreground process use the following command:

$ ./bin/oozied.sh run

Check the Oozie log file logs/oozie.log to ensure Oozie started properly.

Use the following command to check the status of Oozie from command line:

$ ./bin/oozie admin -oozie http://localhost:11000/oozie -status
System mode: NORMAL

URL for the Oozie Web Console is http://localhost:11000/oozie

Oozie Web Console

Oozie Client Setup

$ cd ..
$ cp oozie/oozie-client-3.3.2.tar.gz .
$ tar xvzf oozie-client-3.3.2.tar.gz
$ mv oozie-client-3.3.2 oozie-client
$ cd bin

Add the /home/hduser/oozie-client/bin to PATH in .bashrc and restart your terminal.

Your Oozie Server and Client setup on a single node cluster is now ready. In the next post, we will configure and schedule some Oozie workflows.

Apache Pig Tutorial – Part 2

Let’s have a quick look at the FILTER command from our Part 1:

grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;

Here, we see a (float) keyword placed before the column ‘rating’. This is done to tell Pig that the column we are working on is of type, float. Pig was not informed about the type of the column when the data was loaded.

Following is the command we used to load the data:

grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);

The load command specified only the column names. We can modify the statement as follows to include the data type of the columns:

grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);

In the above statement, name is chararray (string), rating is a double and fields id, year and duration are integers. If the data was loaded using the above statement we would not need to cast the column during filtering.
The datatypes used in the above statement are called scalar data types. The other scalar types are long, double and bytearray.

To get better at using filters, let’s ask the data a few more questions:

List the movies that were released between 1950 and 1960

grunt> movies_between_50_60 = FILTER movies by year>1950 and year<1960; List the movies that start with the Alpahbet A grunt> movies_starting_with_A = FILTER movies by name matches 'A.*';

List the movies that have duration greater that 2 hours

grunt> movies_duration_2_hrs = FILTER movies by duration > 7200;

List the movies that have rating between 3 and 4

grunt> movies_rating_3_4 = FILTER movies BY rating>3.0 and rating<4.0; DESCRIBE The schema of a relation/alias can be viewed using the DESCRIBE command: grunt> DESCRIBE movies;
movies: {id: int,name: chararray,year: int,rating: double,duration: int}

ILLUSTRATE

To view the step-by-step execution of a sequence of statements you can use the ILLUSTRATE command:

grunt> ILLUSTRATE movies_duration_2_hrs;

------------------------------------------------------------------------------------------------------------------------
| movies | id:int | name:chararray | year:int | rating:double | duration:int |
------------------------------------------------------------------------------------------------------------------------
| | 1567 | Barney: Sing & Dance with Barney | 2004 | 2.7 | 3244 |
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
------------------------------------------------------------------------------------------------------------------------

---------------------------------------------------------------------------------------------------------------------
| movies_duration_2_hrs | id:int | name:chararray | year:int | rating:double | duration:int |
---------------------------------------------------------------------------------------------------------------------
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
---------------------------------------------------------------------------------------------------------------------

DESCRIBE and ILLUSTRATE are really useful for debugging.

Complex Types
Pig supports three different complex types to handle data. It is important that you understand these types properly as they will be used very often when working with data.

Tuples
A tuple is just like a row in a table. It is comma separated list of fields.

(49539,'The Magic Crystal',2013,3.7,4561)

The above tuple has five fields. A tuple is surrounded by brackets.

Bags
A bag is an unordered collection of tuples.

{ (49382, 'Final Offer'), (49385, 'Delete') }

The above bag is has two tuples. Each tuple has two fields, Id and movie name.

Maps
A map is a <key, value> store. The key and value are joined together using #.

['name'#'The Magic Crystal', 'year'#2013]

The above map has two keys and name and year and have values ‘The Magic Crystal’ and 2013. The first value is a chararray and the second one is an integer.

We will be using the above complex type quite often in our future examples.

FOREACH

FOREACH gives a simple way to apply transformations based on columns. Let’s understand this with an example.

List the movie names its duration in minutes

grunt> movie_duration = FOREACH movies GENERATE name, (double)(duration/60);

The above statement generates a new alias that has the list of movies and it duration in minutes.
You can check the results using the DUMP command.

GROUP

The GROUP keyword is used to group fields in a relation.

List the years and the number of movies released each year.

grunt> grouped_by_year = group movies by year;
grunt> count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);

You can check the result by dumping the count_by_year relation on the screen.

We know in advance that the total number of movies in the dataset is 49590. We can check to see if our GROUP operation is correct by verify the total of the COUNT field. If he sum of of the count field is 49590 we can be confident that our grouping has worked correctly.

grunt> group_all = GROUP count_by_year ALL;
grunt> sum_all = FOREACH group_all GENERATE SUM(count_by_year.$1);
grunt> DUMP sum_all;

From the above three statements, the first statement, GROUP ALL, groups all the tuples to one group. This is very useful when we need to perform aggregation operations on the entire set.
The next statement, performs a FOREACH on the grouped relation group_all and applies the SUM function to the field in position 1 (positions start from 0). Here field in position 1, are the counts of movies for each year. One execution of the DUMP statement the MapReduce program kicks off and gives us the following result:

(49590)

The above value matches to our know fact that the dataset has 49590 movies. So we can conclude that our GROUP operation worked successfully.

ORDER BY

Let us question the data to illustrate the ORDER BY operation.

List all the movies in the ascending order of year.

grunt> desc_movies_by_year = ORDER movies BY year ASC;
grunt> DUMP desc_movies_by_year;

List all the movies in the descending order of year.

grunt> asc_movies_by_year = ORDER movies by year DESC;
grunt> DUMP asc_movies_by_year;

DISTINCT

The DISTINCT statement is used to remove duplicated records. It works only on entire records, not on individual fields.
Let’s illustrate this with an example:

grunt> movies_with_dups = LOAD '/home/hduser/pig/myscripts/movies_with_duplicates.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
grunt> DUMP movies_with_dups;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
(9,Nosferatu: Original Version,1929,3.5,5651)

You see that there are are duplicates in this data set. Now let us list the distinct records present movies_with_dups:

grunt> no_dups = DISTINCT movies_with_dups;
grunt> DUMP no_dups;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)

LIMIT

Use the LIMIT keyword to get only a limited number for results from relation.

grunt> top_10_movies = LIMIT movies 10;
grunt> DUMP top_10_movies;

(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)

SAMPLE
Use the sample keyword to get sample set from your data.

grunt> sample_10_percent = sample movies 0.1;
grunt> dump sample_10_percent;

Here, 0.1 = 10%
As we already know that the file has 49590 records. We can check to see the count of records in the relation.

grunt> sample_group_all = GROUP sample_10_percent ALL;
grunt> sample_count = FOREACH sample_group_all GENERATE COUNT(sample_10_percent.$0);
grunt> dump sample_count;

The output is (4937) which is approximately 10% for 49590.

In this post we have touched upon some important operations used in Pig. I suggest that you try out all the samples when you go through this tutorial as it is the doing that registers and not the reading. In the next post we will learn few more operations dealing with data transformation.

All code and data for this post can be downloaded from github.

Apache Pig Tutorial – Part 1

Apache Pig is a tool used to analyze large amounts of data by represeting them as data flows. Using the PigLatin scripting language operations like ETL (Extract, Transform and Load), adhoc data anlaysis and iterative processing can be easily achieved.

Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. Before Pig, Java was the only way to process the data stored on HDFS.

Pig was first built in Yahoo! and later became a top level Apache project. In this series of we will walk through the different features of pig using a sample dataset.

Dataset

The dataset that we are using here is from one of my projects called Flicksery. Flicksery is a Netflix Search Engine. The dataset is a simple text (movies_data.csv) file lists movie names and its details like release year, rating and runtime.

A sample of the dataset is as follows:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1932,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1991,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1985,3.8,5333
7,Muriel's Wedding,1994,3.5,6323
8,Mother's Boys,1994,3.4,5733
9,Nosferatu: Original Version,1929,3.5,5651
10,Nick of Time,1995,3.4,5333

All code and data for this post can be downloaded from github. The file has a total of 49590 records.

Installing Pig

Download Pig

$ wget http://mirror.symnds.com/software/Apache/pig/pig-0.12.0/pig-0.12.0.tar.gz

Untar

$ tar xvzf pig-0.12.0.tar.gz

Rename to folder for easier access:

$ mv pig-0.12.0 pig

Update .bashrc to add the following:

export PATH=$PATH:/home/hduser/pig/bin

Pig can be started in one of the following two modes:

  1. Local Mode
  2. Cluster Mode

Using the ’-x local’ options starts pig in the local mode whereas executing the pig command without any options starts in Pig in the cluster mode. When in local mode, pig can access files on the local file system. In cluster mode, pig can access files on HDFS.

Restart your terminal and execute the pig command as follows:

To start in Local Mode:

$ pig -x local
2013-12-25 20:16:26,258 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0 (r1529718) compiled Oct 07 2013, 12:20:14
2013-12-25 20:16:26,259 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hduser/pig/myscripts/pig_1388027786256.log
2013-12-25 20:16:26,281 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hduser/.pigbootup not found
2013-12-25 20:16:26,381 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>

To start in Cluster Mode:

$ pig
2013-12-25 20:19:42,274 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0 (r1529718) compiled Oct 07 2013, 12:20:14
2013-12-25 20:19:42,274 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hduser/pig/myscripts/pig_1388027982272.log
2013-12-25 20:19:42,300 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hduser/.pigbootup not found
2013-12-25 20:19:42,463 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:54310
2013-12-25 20:19:42,672 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: hdfs://localhost:9001
grunt>

This command presents you with a grunt shell. The grunt shell allows you to execute PigLatin statements to quickly test out data flows on your data step by step without having to execute complete scripts. Pig is now installed and we can go ahead and start using Pig to play with data.

Pig Latin

To learn Pig Latin, let’s question the data. Before we start asking questions, we need the data to be accessible in Pig.

Use the following command to load the data:

grunt> movies = LOAD '/home/hduser/pig/myscripts/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);

The above statement is made up of two parts. The part to the left of “=” is called the relation or alias. It looks like a variable but you should note that this is not a variable. When this statement is executed, no MapReduce task is executed.

Since our dataset has records with fields separated by a comma we use the keyword USING PigStorage(‘,’).
Another thing we have done in the above statement is giving the names to the fields using the ‘as’ keyword.

Now, let’s test to see if the alias has the data we loaded.

grunt> DUMP movies;

Once, you execute the above statement, you should see lot of text on the screen (partial text shown below).

2013-12-25 23:03:04,550 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2013-12-25 23:03:04,633 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2013-12-25 23:03:04,748 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2013-12-25 23:03:04,853 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

................

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.1.2 0.12.0 hduser 2013-12-25 23:03:04 2013-12-25 23:03:05 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 movies MAP_ONLY file:/tmp/temp-1685410826/tmp1113990343,

Input(s):
Successfully read records from: "/home/hduser/pig/myscripts/movies_data.csv"

Output(s):
Successfully stored records in: "file:/tmp/temp-1685410826/tmp1113990343"

Job DAG:
job_local_0001

................

(49586,Winter Wonderland,2013,2.8,1812)
(49587,Top Gear: Series 19: Africa Special,2013,,6822)
(49588,Fireplace For Your Home: Crackling Fireplace with Music,2010,,3610)
(49589,Kate Plus Ei8ht,2010,2.7,)
(49590,Kate Plus Ei8ht: Season 1,2010,2.7,)

It is only after the DUMP statement that a MapReduce job is initiated. As we see our data in the output we can confirm that the data has been loaded successfully.

Now, since we have the data in Pig, let’s start with the questions.

List the movies that having a rating greater than 4

grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;
grunt> DUMP movies_greater_than_four;

The above statements filters the alias movies and store the results in a new alias movies_greater_than_four. The movies_greater_than_four alias will have only records of movies where the rating is greater than 4.

The DUMP command is only used to display information onto the standard output. If you need to store the data to a file you can use the following command:

grunt> store movies_greater_than_four into '/user/hduser/movies_greater_than_four';

In this post we got a good feel of Apache Pig. We loaded some data and executed some basic commands to query it. The next post will dive deeper into Pig Latin where we will learn some advanced techniques to do data analysis.

Apache Sqoop

Apache Sqoop(TM) is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.- from http://sqoop.apache.org/

In this post we will get hands on with Sqoop and perform imports and exports of data to and from HDFS repectively. So let’s get started.

Installation

Download Apache Sqoop using the following command:

$ wget http://mirrors.gigenet.com/apache/sqoop/1.4.4/sqoop-1.4.4.bin__hadoop-1.0.0.tar.gz

Untar the file:

$ tar xvzf sqoop-1.4.4.bin__hadoop-1.0.0.tar.gz

Rename the folder to something simpler:

 mv sqoop-1.4.4.bin__hadoop-1.0.0 sqoop

Make sure the following variables are set for the session. You can add them to your .bashrc file

export HADOOP_COMMON_HOME=/home/hduser/hadoop
export HADOOP_MAPRED_HOME=/home/hduser/hadoop

I just pointed them to the Hadoop home directory location.

Importing data from MySql

For this post I have a database by the name, sqoop_test. The database has a table by the name, movies which contains two columns : movie and rating.
I would like to import the data to HDFS using Sqoop.

mysql> select * from movies;
+--------------------------------+--------+
| movie | rating |
+--------------------------------+--------+
| The Nightmare Before Christmas | 3.4 |
| The Mummy | 2.6 |
| Orphans of the Storm | 4.5 |
| The Object of Beauty | 3 |
| Night Tide | 2 |
| One Magic Christmas | 1.2 |
| Nosferatu: Original Version | 2.3 |
| Nick of Time | 3.6 |
+--------------------------------+--------+
8 rows in set (0.00 sec)

Before we import we will need to get the Mysql JDBC driver from here.

I downloaded mysql-connector-java-5.1.28-bin.jar and placed it under /home/hduser/sqoop/lib/

To import the data use the following command:

$ sqoop import --connect jdbc:mysql://localhost/sqoop_test --table movies -m 1

Output:

13/12/24 23:21:22 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
13/12/24 23:21:22 INFO tool.CodeGenTool: Beginning code generation
13/12/24 23:21:22 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `movies` AS t LIMIT 1
13/12/24 23:21:22 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `movies` AS t LIMIT 1
13/12/24 23:21:22 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hduser/hadoop
Note: /tmp/sqoop-hduser/compile/1120535a7891cfa30210e48b3ed06237/movies.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/12/24 23:21:23 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hduser/compile/1120535a7891cfa30210e48b3ed06237/movies.jar
13/12/24 23:21:23 WARN manager.MySQLManager: It looks like you are importing from mysql.
13/12/24 23:21:23 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
13/12/24 23:21:23 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
13/12/24 23:21:23 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
13/12/24 23:21:23 INFO mapreduce.ImportJobBase: Beginning import of movies
13/12/24 23:21:25 INFO mapred.JobClient: Running job: job_201312241604_0001
13/12/24 23:21:26 INFO mapred.JobClient: map 0% reduce 0%
13/12/24 23:21:32 INFO mapred.JobClient: map 100% reduce 0%
13/12/24 23:21:32 INFO mapred.JobClient: Job complete: job_201312241604_0001
13/12/24 23:21:32 INFO mapred.JobClient: Counters: 18
13/12/24 23:21:32 INFO mapred.JobClient: Job Counters
13/12/24 23:21:32 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=4705
13/12/24 23:21:32 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/12/24 23:21:32 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/12/24 23:21:32 INFO mapred.JobClient: Launched map tasks=1
13/12/24 23:21:32 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/12/24 23:21:32 INFO mapred.JobClient: File Output Format Counters
13/12/24 23:21:32 INFO mapred.JobClient: Bytes Written=183
13/12/24 23:21:32 INFO mapred.JobClient: FileSystemCounters
13/12/24 23:21:32 INFO mapred.JobClient: HDFS_BYTES_READ=87
13/12/24 23:21:32 INFO mapred.JobClient: FILE_BYTES_WRITTEN=60921
13/12/24 23:21:32 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=183
13/12/24 23:21:32 INFO mapred.JobClient: File Input Format Counters
13/12/24 23:21:32 INFO mapred.JobClient: Bytes Read=0
13/12/24 23:21:32 INFO mapred.JobClient: Map-Reduce Framework
13/12/24 23:21:32 INFO mapred.JobClient: Map input records=8
13/12/24 23:21:32 INFO mapred.JobClient: Physical memory (bytes) snapshot=35909632
13/12/24 23:21:32 INFO mapred.JobClient: Spilled Records=0
13/12/24 23:21:32 INFO mapred.JobClient: CPU time spent (ms)=330
13/12/24 23:21:32 INFO mapred.JobClient: Total committed heap usage (bytes)=16252928
13/12/24 23:21:32 INFO mapred.JobClient: Virtual memory (bytes) snapshot=345686016
13/12/24 23:21:32 INFO mapred.JobClient: Map output records=8
13/12/24 23:21:32 INFO mapred.JobClient: SPLIT_RAW_BYTES=87
13/12/24 23:21:32 INFO mapreduce.ImportJobBase: Transferred 183 bytes in 8.3529 seconds (21.9085 bytes/sec)
13/12/24 23:21:32 INFO mapreduce.ImportJobBase: Retrieved 8 records.

As per the logs all 8 records form the table have been retrieved. Let’s look at it from HDFS:

$ hadoop fs -cat /user/hduser/movies/

The Nightmare Before Christmas,3.4
The Mummy,2.6
Orphans of the Storm,4.5
The Object of Beauty,3
Night Tide,2
One Magic Christmas,1.2
Nosferatu: Original Version,2.3
Nick of Time,3.6

We have successfully imported data from MySql to HDFS.

Exporting data

Let’s use Sqoop to export data to MySql.

The data I would like to import looks as follows:

The Shawshank Redemption,3.4
Rockstar,2.6
The Rear Window,4.5
Beauty and the Beast,3
Galdiator,3
Nowhere to Run,3.2
Fargo,3.3
Next,3.6

The filename is movies_export.csv and is under the location /user/hduser/exports/

I have a table movies_export with as follows:

mysql> desc movies_export;
+--------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+--------------+------+-----+---------+-------+
| movie | varchar(255) | YES | | NULL | |
| rating | varchar(255) | YES | | NULL | |
+--------+--------------+------+-----+---------+-------+
2 rows in set (0.00 sec)

As of now there are no rows in the table:

mysql> select * from movies_export;
Empty set (0.00 sec)

Use the following command to export the data to MySql:

$ sqoop export --connect jdbc:mysql://localhost/sqoop_test --table movies_export --export-dir '/user/hduser/exports' -m 1;

Output:

13/12/24 23:38:50 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
13/12/24 23:38:50 INFO tool.CodeGenTool: Beginning code generation
13/12/24 23:38:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `movies_export` AS t LIMIT 1
13/12/24 23:38:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `movies_export` AS t LIMIT 1
13/12/24 23:38:50 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hduser/hadoop
Note: /tmp/sqoop-hduser/compile/2b3381307e097e38b60b0e204c1b7a68/movies_export.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/12/24 23:38:51 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hduser/compile/2b3381307e097e38b60b0e204c1b7a68/movies_export.jar
13/12/24 23:38:51 INFO mapreduce.ExportJobBase: Beginning export of movies_export
13/12/24 23:38:52 INFO input.FileInputFormat: Total input paths to process : 1
13/12/24 23:38:52 INFO input.FileInputFormat: Total input paths to process : 1
13/12/24 23:38:52 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/12/24 23:38:52 WARN snappy.LoadSnappy: Snappy native library not loaded
13/12/24 23:38:52 INFO mapred.JobClient: Running job: job_201312241604_0002
13/12/24 23:38:53 INFO mapred.JobClient: map 0% reduce 0%
13/12/24 23:38:57 INFO mapred.JobClient: map 100% reduce 0%
13/12/24 23:38:57 INFO mapred.JobClient: Job complete: job_201312241604_0002
13/12/24 23:38:57 INFO mapred.JobClient: Counters: 18
13/12/24 23:38:57 INFO mapred.JobClient: Job Counters
13/12/24 23:38:57 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=3536
13/12/24 23:38:57 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/12/24 23:38:57 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/12/24 23:38:57 INFO mapred.JobClient: Launched map tasks=1
13/12/24 23:38:57 INFO mapred.JobClient: Data-local map tasks=1
13/12/24 23:38:57 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/12/24 23:38:57 INFO mapred.JobClient: File Output Format Counters
13/12/24 23:38:57 INFO mapred.JobClient: Bytes Written=0
13/12/24 23:38:57 INFO mapred.JobClient: FileSystemCounters
13/12/24 23:38:57 INFO mapred.JobClient: HDFS_BYTES_READ=279
13/12/24 23:38:57 INFO mapred.JobClient: FILE_BYTES_WRITTEN=60753
13/12/24 23:38:57 INFO mapred.JobClient: File Input Format Counters
13/12/24 23:38:57 INFO mapred.JobClient: Bytes Read=0
13/12/24 23:38:57 INFO mapred.JobClient: Map-Reduce Framework
13/12/24 23:38:57 INFO mapred.JobClient: Map input records=8
13/12/24 23:38:57 INFO mapred.JobClient: Physical memory (bytes) snapshot=35356672
13/12/24 23:38:57 INFO mapred.JobClient: Spilled Records=0
13/12/24 23:38:57 INFO mapred.JobClient: CPU time spent (ms)=300
13/12/24 23:38:57 INFO mapred.JobClient: Total committed heap usage (bytes)=16252928
13/12/24 23:38:57 INFO mapred.JobClient: Virtual memory (bytes) snapshot=345022464
13/12/24 23:38:57 INFO mapred.JobClient: Map output records=8
13/12/24 23:38:57 INFO mapred.JobClient: SPLIT_RAW_BYTES=141
13/12/24 23:38:57 INFO mapreduce.ExportJobBase: Transferred 279 bytes in 5.7845 seconds (48.2327 bytes/sec)
13/12/24 23:38:57 INFO mapreduce.ExportJobBase: Exported 8 records.

As per the logs, 8 records from the file have been exported to MySql. Let’s verify it by querying the table:

mysql> select * from movies_export;
+--------------------------+--------+
| movie | rating |
+--------------------------+--------+
| The Shawshank Redemption | 3.4 |
| Rockstar | 2.6 |
| The Rear Window | 4.5 |
| Beauty and the Beast | 3 |
| Galdiator | 3 |
| Nowhere to Run | 3.2 |
| Fargo | 3.3 |
| Next | 3.6 |
+--------------------------+--------+
8 rows in set (0.00 sec)

We have successfully exported data from HDFS to MySql using Sqoop.

This was just an introduction. Sqoop sports tons of features and to read more about it you can read the official Sqoop documentation at : http://sqoop.apache.org/

Enjoy !

Apache Hadoop Streaming

You can find my screencast for Apache Hadoop Streaming here at www.hadoopscreencasts.com

Apache Hadoop Streaming is a feature that allows developers to write MapReduce applications using languages like Python, Ruby, etc. A language that can read from standard input (STDIN) and write to standard output (STDOUT) can be used to write MapReduce applications.

In this post, I use Ruby to write the map and reduce functions.

First, let’s have some sample data. For a simple test, I have one file, that has just one line with a few repeating words.
The contents of the file (sample.txt) is as follows:

she sells sea shells on the sea shore where she sells fish too

Next, let’s create a ruby file for the map function and call it map.rb

Contents of map.rb

#!/usr/bin/env ruby

STDIN.each do |line|
line.split.each do |word|
puts word + "\t" + "1"
end
end

In the above map code, we are splitting each line into words and emitting each word as a key with value 1.

Now, let’s create a ruby file for the reduce function and call it reduce.rb

Contents of reduce.rb

#!/usr/bin/env ruby

prev_key=nil
init_val = 1

STDIN.each do |line|
key, value = line.split("\t")
if prev_key != nil && prev_key != key
puts prev_key + "\t" + init_val.to_s
prev_key = key
init_val = 1
elsif prev_key == nil
prev_key = key
elsif prev_key == key
init_val = init_val + value.to_i
end
end

puts prev_key + "\t" + init_val.to_s

In the above reduce code we take in each key and sum up the values for that key before printing.

We are now ready to test the map and reduce function locally before we run on the cluster.

Execute the following:

$ cat sample.txt | ruby map.rb | sort | ruby reduce.rb

The output should be as follows:

fish 1
on 1
sea 2
sells 2
she 2
shells 1
shore 1
the 1
too 1
where 1

In the above command we have provided the contents of the file sample.txt as input to the map.rb which in turn provides data to reduce.rb. The data is sorted before it is sent to the reducer.
It looks like are program is working as expected. Now it’s time to deploy this on a Hadoop cluster.

First, let move the sample data to a folder in HDFS:

$ hadoop fs -copyFromLocal sample.txt /user/data/

Once we have our sample data in HDFS we can execute the following command from the hadoop/bin folder to execute our MapReduce job:

$ hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar -file map.rb -mapper map.rb -file reduce.rb -reducer reduce.rb -input /user/data/* -output /user/wc

If everything goes fine you should see the following output on your terminal:

packageJobJar: [map.rb, reduce.rb, /home/hduser/tmp/hadoop-unjar2392048729049303810/] [] /tmp/streamjob3038768339999397115.jar tmpDir=null
13/12/12 10:25:01 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/12/12 10:25:01 WARN snappy.LoadSnappy: Snappy native library not loaded
13/12/12 10:25:01 INFO mapred.FileInputFormat: Total input paths to process : 1
13/12/12 10:25:01 INFO streaming.StreamJob: getLocalDirs(): [/home/hduser/tmp/mapred/local]
13/12/12 10:25:01 INFO streaming.StreamJob: Running job: job_201312120020_0007
13/12/12 10:25:01 INFO streaming.StreamJob: To kill this job, run:
13/12/12 10:25:01 INFO streaming.StreamJob: /home/hduser/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201312120020_0007
13/12/12 10:25:01 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201312120020_0007
13/12/12 10:25:02 INFO streaming.StreamJob: map 0% reduce 0%
13/12/12 10:25:05 INFO streaming.StreamJob: map 50% reduce 0%
13/12/12 10:25:06 INFO streaming.StreamJob: map 100% reduce 0%
13/12/12 10:25:13 INFO streaming.StreamJob: map 100% reduce 33%
13/12/12 10:25:14 INFO streaming.StreamJob: map 100% reduce 100%
13/12/12 10:25:15 INFO streaming.StreamJob: Job complete: job_201312120020_0007
13/12/12 10:25:15 INFO streaming.StreamJob: Output: /user/wc

Now, let’s look at the output file generated to see our results:

$ hadoop fs -cat /user/wc/part-00000

fish 1
on 1
sea 2
sells 2
she 2
shells 1
shore 1
the 1
too 1
where 1

The results are just as we expected. We have successfully built and executed Hadoop MapReduce application using streaming written in Ruby.

You can find my screencast for Apache Hadoop Sreaming here at www.hadoopscreencasts.com

[HOW TO] Fix PYCURL ERROR 6 – “Couldn’t resolve host ‘mirrorlist.centos.org’”

I have been using several VMs for simulating a multi-node environment. Most of my VMs are CentOS.
After installing CentOS 6.4 I got the following error when I tried “yum update“:

Could not retrieve mirrorlist http://mirrorlist.centos.org/?release=6&arch=x86_64&repo=os error was
14: PYCURL ERROR 6 - "Couldn't resolve host 'mirrorlist.centos.org'"
Error: Cannot find a valid baseurl for repo: base

To fix this error I updated NM_CONTROLLED to “no” in the file /etc/sysconfig/network-scripts/ifcfg-eth0

After this I restarted the network interface using the following commands:

ifdown eth0
ifup eth0

After doing the above the yum update started working.

Hope this helps.